in treeherder/etl/jobs.py [0:0]
def _load_job(repository, job_datum, push_id):
"""
Load a job into the treeherder database
If the job is a ``retry`` the ``job_guid`` will have a special
suffix on it. But the matching ``pending``/``running`` job will not.
So we append the suffixed ``job_guid`` to ``retry_job_guids``
so that we can update the job_id_lookup later with the non-suffixed
``job_guid`` (root ``job_guid``). Then we can find the right
``pending``/``running`` job and update it with this ``retry`` job.
"""
build_platform, _ = BuildPlatform.objects.get_or_create(
os_name=job_datum.get("build_platform", {}).get("os_name", "unknown"),
platform=job_datum.get("build_platform", {}).get("platform", "unknown"),
architecture=job_datum.get("build_platform", {}).get("architecture", "unknown"),
)
machine_platform, _ = MachinePlatform.objects.get_or_create(
os_name=job_datum.get("machine_platform", {}).get("os_name", "unknown"),
platform=job_datum.get("machine_platform", {}).get("platform", "unknown"),
architecture=job_datum.get("machine_platform", {}).get("architecture", "unknown"),
)
option_names = job_datum.get("option_collection", [])
option_collection_hash = OptionCollection.calculate_hash(option_names)
if not OptionCollection.objects.filter(option_collection_hash=option_collection_hash).exists():
# in the unlikely event that we haven't seen this set of options
# before, add the appropriate database rows
options = []
for option_name in option_names:
option, _ = Option.objects.get_or_create(name=option_name)
options.append(option)
for option in options:
OptionCollection.objects.create(
option_collection_hash=option_collection_hash, option=option
)
machine, _ = Machine.objects.get_or_create(name=job_datum.get("machine", "unknown"))
job_type, _ = JobType.objects.get_or_create(
symbol=job_datum.get("job_symbol") or "unknown", name=job_datum.get("name") or "unknown"
)
job_group, _ = JobGroup.objects.get_or_create(
name=job_datum.get("group_name") or "unknown",
symbol=job_datum.get("group_symbol") or "unknown",
)
product_name = job_datum.get("product_name", "unknown")
if not product_name.strip():
product_name = "unknown"
product, _ = Product.objects.get_or_create(name=product_name)
job_guid = job_datum["job_guid"]
job_guid = job_guid[0:50]
who = job_datum.get("who") or "unknown"
who = who[0:50]
reason = job_datum.get("reason") or "unknown"
reason = reason[0:125]
state = job_datum.get("state") or "unknown"
state = state[0:25]
build_system_type = job_datum.get("build_system_type", "buildbot")
reference_data_name = job_datum.get("reference_data_name", None)
default_failure_classification = FailureClassification.objects.get(name="not classified")
sh = sha1()
sh.update(
"".join(
map(
str,
[
build_system_type,
repository.name,
build_platform.os_name,
build_platform.platform,
build_platform.architecture,
machine_platform.os_name,
machine_platform.platform,
machine_platform.architecture,
job_group.name,
job_group.symbol,
job_type.name,
job_type.symbol,
option_collection_hash,
reference_data_name,
],
)
).encode("utf-8")
)
signature_hash = sh.hexdigest()
# Should be the buildername in the case of buildbot (if not provided
# default to using the signature hash)
if not reference_data_name:
reference_data_name = signature_hash
signature, _ = ReferenceDataSignatures.objects.get_or_create(
name=reference_data_name,
signature=signature_hash,
build_system_type=build_system_type,
repository=repository.name,
defaults={
"first_submission_timestamp": time.time(),
"build_os_name": build_platform.os_name,
"build_platform": build_platform.platform,
"build_architecture": build_platform.architecture,
"machine_os_name": machine_platform.os_name,
"machine_platform": machine_platform.platform,
"machine_architecture": machine_platform.architecture,
"job_group_name": job_group.name,
"job_group_symbol": job_group.symbol,
"job_type_name": job_type.name,
"job_type_symbol": job_type.symbol,
"option_collection_hash": option_collection_hash,
},
)
tier = job_datum.get("tier") or 1
result = job_datum.get("result", "unknown")
submit_time = datetime.fromtimestamp(_get_number(job_datum.get("submit_timestamp")))
start_time = datetime.fromtimestamp(_get_number(job_datum.get("start_timestamp")))
end_time = datetime.fromtimestamp(_get_number(job_datum.get("end_timestamp")))
# first, try to create the job with the given guid (if it doesn't
# exist yet)
job_guid_root = get_guid_root(job_guid)
if not Job.objects.filter(guid__in=[job_guid, job_guid_root]).exists():
# This could theoretically already have been created by another process
# that is running updates simultaneously. So just attempt to create
# it, but allow it to skip if it's the same guid. The odds are
# extremely high that this is a pending and running job that came in
# quick succession and are being processed by two different workers.
Job.objects.get_or_create(
guid=job_guid,
defaults={
"repository": repository,
"signature": signature,
"build_platform": build_platform,
"machine_platform": machine_platform,
"machine": machine,
"option_collection_hash": option_collection_hash,
"job_type": job_type,
"job_group": job_group,
"product": product,
"failure_classification": default_failure_classification,
"who": who,
"reason": reason,
"result": result,
"state": state,
"tier": tier,
"submit_time": submit_time,
"start_time": start_time,
"end_time": end_time,
"last_modified": datetime.now(),
"push_id": push_id,
},
)
# Can't just use the ``job`` we would get from the ``get_or_create``
# because we need to try the job_guid_root instance first for update,
# rather than a possible retry job instance.
try:
job = Job.objects.get(guid=job_guid_root)
except ObjectDoesNotExist:
job = Job.objects.get(guid=job_guid)
# add taskcluster metadata if applicable
if all([k in job_datum for k in ["taskcluster_task_id", "taskcluster_retry_id"]]):
try:
TaskclusterMetadata.objects.create(
job=job,
task_id=job_datum["taskcluster_task_id"],
retry_id=job_datum["taskcluster_retry_id"],
)
except IntegrityError:
pass
# Update job with any data that would have changed
Job.objects.filter(id=job.id).update(
guid=job_guid,
signature=signature,
build_platform=build_platform,
machine_platform=machine_platform,
machine=machine,
option_collection_hash=option_collection_hash,
job_type=job_type,
job_group=job_group,
product=product,
result=result,
state=state,
tier=tier,
submit_time=submit_time,
start_time=start_time,
end_time=end_time,
last_modified=datetime.now(),
push_id=push_id,
)
log_refs = job_datum.get("log_references", [])
job_logs = []
if log_refs:
for log in log_refs:
name = log.get("name") or "unknown"
name = name[0:50]
url = log.get("url") or "unknown"
url = url[0:255]
parse_status_map = dict([(k, v) for (v, k) in JobLog.STATUSES])
mapped_status = parse_status_map.get(log.get("parse_status"))
if mapped_status:
parse_status = mapped_status
else:
parse_status = JobLog.PENDING
jl, _ = JobLog.objects.get_or_create(
job=job, name=name, url=url, defaults={"status": parse_status}
)
job_logs.append(jl)
_schedule_log_parsing(job, job_logs, result, repository)
return job_guid