in backend/python/plugins/azuredevops/azuredevops/streams/jobs.py [0:0]
def convert(self, j: Job, ctx: Context) -> Iterable[devops.CICDPipeline]:
if not j.start_time:
return
result = None
if j.result == Job.JobResult.Abandoned:
result = devops.CICDResult.ABORT
elif j.result == Job.JobResult.Canceled:
result = devops.CICDResult.ABORT
elif j.result == Job.JobResult.Failed:
result = devops.CICDResult.FAILURE
elif j.result == Job.JobResult.Skipped:
result = devops.CICDResult.ABORT
elif j.result == Job.JobResult.Succeeded:
result = devops.CICDResult.SUCCESS
elif j.result == Job.JobResult.SucceededWithIssues:
result = devops.CICDResult.FAILURE
status = None
if j.state == Job.JobState.Completed:
status = devops.CICDStatus.DONE
elif j.state == Job.JobState.InProgress:
status = devops.CICDStatus.IN_PROGRESS
if j.state == Job.JobState.Pending:
status = devops.CICDStatus.IN_PROGRESS
type = devops.CICDType.BUILD
if ctx.scope_config.deployment_pattern and ctx.scope_config.deployment_pattern.search(j.name):
type = devops.CICDType.DEPLOYMENT
environment = devops.CICDEnvironment.TESTING
if ctx.scope_config.production_pattern and ctx.scope_config.production_pattern.search(j.name):
environment = devops.CICDEnvironment.PRODUCTION
if j.finish_time:
duration_sec = abs(j.finish_time.second-j.start_time.second)
else:
duration_sec = 0
yield devops.CICDTask(
id=j.id,
name=j.name,
pipeline_id=j.build_id,
status=status,
created_date=j.start_time,
finished_date=j.finish_time,
result=result,
type=type,
duration_sec=duration_sec,
environment=environment,
cicd_scope_id=ctx.scope.domain_id()
)