def store_job_data()

in treeherder/etl/jobs.py [0:0]


def store_job_data(repository, original_data):
    """
    Store job data instances into jobs db

    Example:
    [
        {
            "revision": "24fd64b8251fac5cf60b54a915bffa7e51f636b5",
            "job": {
                "job_guid": "d19375ce775f0dc166de01daa5d2e8a73a8e8ebf",
                "name": "xpcshell",
                "desc": "foo",
                "job_symbol": "XP",
                "group_name": "Shelliness",
                "group_symbol": "XPC",
                "product_name": "firefox",
                "state": "TODO",
                "result": 0,
                "reason": "scheduler",
                "who": "sendchange-unittest",
                "submit_timestamp": 1365732271,
                "start_timestamp": "20130411165317",
                "end_timestamp": "1365733932"
                "machine": "tst-linux64-ec2-314",
                "build_platform": {
                    "platform": "Ubuntu VM 12.04",
                    "os_name": "linux",
                    "architecture": "x86_64"
                },
                "machine_platform": {
                    "platform": "Ubuntu VM 12.04",
                    "os_name": "linux",
                    "architecture": "x86_64"
                },
                "option_collection": {
                    "opt": true
                },
                "log_references": [
                    {
                        "url": "http://ftp.mozilla.org/pub/...",
                        "name": "unittest"
                    }
                ],
            },
            "superseded": []
        },
        ...
    ]

    """
    data = copy.deepcopy(original_data)
    # Ensure that we have job data to process
    if not data:
        return

    # remove any existing jobs that already have the same state
    data = _remove_existing_jobs(data)
    if not data:
        return

    superseded_job_guid_placeholders = []

    # TODO: Refactor this now that store_job_data() is only over called with one job at a time.
    for datum in data:
        try:
            # TODO: this might be a good place to check the datum against
            # a JSON schema to ensure all the fields are valid.  Then
            # the exception we caught would be much more informative.  That
            # being said, if/when we transition to only using the pulse
            # job consumer, then the data will always be vetted with a
            # JSON schema before we get to this point.
            job = datum["job"]
            revision = datum["revision"]
            superseded = datum.get("superseded", [])

            revision_field = "revision__startswith" if len(revision) < 40 else "revision"
            filter_kwargs = {"repository": repository, revision_field: revision}
            push_id = Push.objects.values_list("id", flat=True).get(**filter_kwargs)

            # load job
            job_guid = _load_job(repository, job, push_id)

            for superseded_guid in superseded:
                superseded_job_guid_placeholders.append(
                    # superseded by guid, superseded guid
                    [job_guid, superseded_guid]
                )
        except Exception as e:
            # Surface the error immediately unless running in production, where we'd
            # rather report it on New Relic and not block storing the remaining jobs.
            # TODO: Once buildbot support is removed, remove this as part of
            # refactoring this method to process just one job at a time.
            if "DYNO" not in os.environ:
                raise

            logger.exception(e)
            # make more fields visible in new relic for the job
            # where we encountered the error
            datum.update(datum.get("job", {}))
            newrelic.agent.notice_error(attributes=datum)

            # skip any jobs that hit errors in these stages.
            continue

    # Update the result/state of any jobs that were superseded by those ingested above.
    if superseded_job_guid_placeholders:
        for job_guid, superseded_by_guid in superseded_job_guid_placeholders:
            Job.objects.filter(guid=superseded_by_guid).update(
                result="superseded", state="completed"
            )