receiver/githubreceiver/trace_event_handling.go (270 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package githubreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver" import ( "crypto/sha256" "encoding/hex" "errors" "fmt" "strings" "github.com/google/go-github/v70/github" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/multierr" "go.uber.org/zap" ) func (gtr *githubTracesReceiver) handleWorkflowRun(e *github.WorkflowRunEvent) (ptrace.Traces, error) { t := ptrace.NewTraces() r := t.ResourceSpans().AppendEmpty() resource := r.Resource() err := gtr.getWorkflowRunAttrs(resource, e) if err != nil { return ptrace.Traces{}, fmt.Errorf("failed to get workflow run attributes: %w", err) } traceID, err := newTraceID(e.GetWorkflowRun().GetID(), e.GetWorkflowRun().GetRunAttempt()) if err != nil { gtr.logger.Sugar().Error("failed to generate trace ID", zap.Error(err)) } err = gtr.createRootSpan(r, e, traceID) if err != nil { gtr.logger.Sugar().Error("failed to create root span", zap.Error(err)) return ptrace.Traces{}, errors.New("failed to create root span") } return t, nil } // handleWorkflowJob handles the creation of spans for a GitHub Workflow Job // events, including the underlying steps within each job. A `job` maps to the // semantic conventions for a `cicd.pipeline.task`. func (gtr *githubTracesReceiver) handleWorkflowJob(e *github.WorkflowJobEvent) (ptrace.Traces, error) { t := ptrace.NewTraces() r := t.ResourceSpans().AppendEmpty() resource := r.Resource() err := gtr.getWorkflowJobAttrs(resource, e) if err != nil { return ptrace.Traces{}, fmt.Errorf("failed to get workflow run attributes: %w", err) } traceID, err := newTraceID(e.GetWorkflowJob().GetRunID(), int(e.GetWorkflowJob().GetRunAttempt())) if err != nil { gtr.logger.Sugar().Error("failed to generate trace ID", zap.Error(err)) } parentID, err := gtr.createParentSpan(r, e, traceID) if err != nil { gtr.logger.Sugar().Error("failed to create parent span", zap.Error(err)) return ptrace.Traces{}, errors.New("failed to create parent span") } err = gtr.createStepSpans(r, e, traceID, parentID) if err != nil { gtr.logger.Sugar().Error("failed to create step spans", zap.Error(err)) return ptrace.Traces{}, errors.New("failed to create step spans") } return t, nil } // newTraceID creates a deterministic Trace ID based on the provided inputs of // runID and runAttempt. `t` is appended to the end of the input to // differentiate between a deterministic traceID and the parentSpanID. func newTraceID(runID int64, runAttempt int) (pcommon.TraceID, error) { input := fmt.Sprintf("%d%dt", runID, runAttempt) // TODO: Determine if this is the best hashing algorithm to use. This is // more likely to generate a unique hash compared to MD5 or SHA1. Could // alternatively use UUID library to generate a unique ID by also using a // hash. hash := sha256.Sum256([]byte(input)) idHex := hex.EncodeToString(hash[:]) var id pcommon.TraceID _, err := hex.Decode(id[:], []byte(idHex[:32])) if err != nil { return pcommon.TraceID{}, err } return id, nil } // newParentId creates a deterministic Parent Span ID based on the provided // runID and runAttempt. `s` is appended to the end of the input to // differentiate between a deterministic traceID and the parentSpanID. func newParentSpanID(runID int64, runAttempt int) (pcommon.SpanID, error) { input := fmt.Sprintf("%d%ds", runID, runAttempt) hash := sha256.Sum256([]byte(input)) spanIDHex := hex.EncodeToString(hash[:]) var spanID pcommon.SpanID _, err := hex.Decode(spanID[:], []byte(spanIDHex[16:32])) if err != nil { return pcommon.SpanID{}, err } return spanID, nil } // createRootSpan creates a root span based on the provided event, associated // with the deterministic traceID. func (gtr *githubTracesReceiver) createRootSpan( resourceSpans ptrace.ResourceSpans, event *github.WorkflowRunEvent, traceID pcommon.TraceID, ) error { scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() span := scopeSpans.Spans().AppendEmpty() rootSpanID, err := newParentSpanID(event.GetWorkflowRun().GetID(), event.GetWorkflowRun().GetRunAttempt()) if err != nil { return fmt.Errorf("failed to generate root span ID: %w", err) } span.SetTraceID(traceID) span.SetSpanID(rootSpanID) span.SetName(event.GetWorkflowRun().GetName()) span.SetKind(ptrace.SpanKindServer) span.SetStartTimestamp(pcommon.NewTimestampFromTime(event.GetWorkflowRun().GetRunStartedAt().Time)) span.SetEndTimestamp(pcommon.NewTimestampFromTime(event.GetWorkflowRun().GetUpdatedAt().Time)) switch strings.ToLower(event.WorkflowRun.GetConclusion()) { case "success": span.Status().SetCode(ptrace.StatusCodeOk) case "failure": span.Status().SetCode(ptrace.StatusCodeError) default: span.Status().SetCode(ptrace.StatusCodeUnset) } span.Status().SetMessage(event.GetWorkflowRun().GetConclusion()) // Attempt to link to previous trace ID if applicable if event.GetWorkflowRun().GetPreviousAttemptURL() != "" && event.GetWorkflowRun().GetRunAttempt() > 1 { gtr.logger.Debug("Linking to previous trace ID for WorkflowRunEvent") previousRunAttempt := event.GetWorkflowRun().GetRunAttempt() - 1 previousTraceID, err := newTraceID(event.GetWorkflowRun().GetID(), previousRunAttempt) if err != nil { return fmt.Errorf("failed to generate previous traceID: %w", err) } link := span.Links().AppendEmpty() link.SetTraceID(previousTraceID) gtr.logger.Debug("successfully linked to previous trace ID", zap.String("previousTraceID", previousTraceID.String())) } return nil } // createParentSpan creates a parent span based on the provided event, associated // with the deterministic traceID. func (gtr *githubTracesReceiver) createParentSpan( resourceSpans ptrace.ResourceSpans, event *github.WorkflowJobEvent, traceID pcommon.TraceID, ) (pcommon.SpanID, error) { scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() span := scopeSpans.Spans().AppendEmpty() parentSpanID, err := newParentSpanID(event.GetWorkflowJob().GetRunID(), int(event.GetWorkflowJob().GetRunAttempt())) if err != nil { return pcommon.SpanID{}, fmt.Errorf("failed to generate parent span ID: %w", err) } jobSpanID, err := newJobSpanID(event.GetWorkflowJob().GetRunID(), int(event.GetWorkflowJob().GetRunAttempt()), event.GetWorkflowJob().GetName()) if err != nil { return pcommon.SpanID{}, fmt.Errorf("failed to generate job span ID: %w", err) } span.SetTraceID(traceID) span.SetParentSpanID(parentSpanID) span.SetSpanID(jobSpanID) span.SetName(event.GetWorkflowJob().GetName()) span.SetKind(ptrace.SpanKindServer) // Workflow Job event start times provided by GitHub do not always match the // start time of the actual job. Generally they are reported a second after // the actual step start time. Thus, we use the first step start time as the // span start time while using the normal completedAt time for the end time. steps := event.GetWorkflowJob().Steps if len(steps) > 0 { span.SetStartTimestamp(pcommon.NewTimestampFromTime(steps[0].GetStartedAt().Time)) span.SetEndTimestamp(pcommon.NewTimestampFromTime(steps[len(steps)-1].GetCompletedAt().Time)) } else { span.SetStartTimestamp(pcommon.NewTimestampFromTime(event.GetWorkflowJob().GetStartedAt().Time)) span.SetStartTimestamp(pcommon.NewTimestampFromTime(event.GetWorkflowJob().GetStartedAt().Time)) } switch strings.ToLower(event.WorkflowJob.GetConclusion()) { case "success": span.Status().SetCode(ptrace.StatusCodeOk) case "failure": span.Status().SetCode(ptrace.StatusCodeError) default: span.Status().SetCode(ptrace.StatusCodeUnset) } span.Status().SetMessage(event.GetWorkflowJob().GetConclusion()) return jobSpanID, nil } // newJobSpanId creates a deterministic Job Span ID based on the provided runID, // runAttempt, and the name of the job. func newJobSpanID(runID int64, runAttempt int, jobName string) (pcommon.SpanID, error) { input := fmt.Sprintf("%d%d%s", runID, runAttempt, jobName) hash := sha256.Sum256([]byte(input)) spanIDHex := hex.EncodeToString(hash[:]) var spanID pcommon.SpanID _, err := hex.Decode(spanID[:], []byte(spanIDHex[16:32])) if err != nil { return pcommon.SpanID{}, err } return spanID, nil } // createStepSpans is a wrapper function to create spans for each step in the // the workflow job by identifying duplicate names then creating a span for each // step. func (gtr *githubTracesReceiver) createStepSpans( resourceSpans ptrace.ResourceSpans, event *github.WorkflowJobEvent, traceID pcommon.TraceID, parentSpanID pcommon.SpanID, ) error { steps := event.GetWorkflowJob().Steps unique := newUniqueSteps(steps) var errors error for i, step := range steps { name := unique[i] err := gtr.createStepSpan(resourceSpans, traceID, parentSpanID, event, step, name) if err != nil { errors = multierr.Append(errors, err) } } return errors } // newUniqueSteps creates a new slice of step names from the provided GitHub // event steps. Each step name, if duplicated, is appended with `-n` where n is // the numbered occurrence. func newUniqueSteps(steps []*github.TaskStep) []string { if len(steps) == 0 { return nil } results := make([]string, len(steps)) count := make(map[string]int, len(steps)) for _, step := range steps { count[step.GetName()]++ } occurrences := make(map[string]int, len(steps)) for i, step := range steps { name := step.GetName() if count[name] == 1 { results[i] = name continue } occurrences[name]++ if occurrences[name] == 1 { results[i] = name } else { results[i] = fmt.Sprintf("%s-%d", name, occurrences[name]-1) } } return results } // createStepSpan creates a span with a deterministic spandID for the provided // step. func (gtr *githubTracesReceiver) createStepSpan( resourceSpans ptrace.ResourceSpans, traceID pcommon.TraceID, parentSpanID pcommon.SpanID, event *github.WorkflowJobEvent, step *github.TaskStep, name string, ) error { scopeSpans := resourceSpans.ScopeSpans().AppendEmpty() span := scopeSpans.Spans().AppendEmpty() span.SetName(name) span.SetKind(ptrace.SpanKindServer) span.SetTraceID(traceID) span.SetParentSpanID(parentSpanID) runID := event.GetWorkflowJob().GetRunID() runAttempt := int(event.GetWorkflowJob().GetRunAttempt()) jobName := event.GetWorkflowJob().GetName() stepName := step.GetName() number := int(step.GetNumber()) spanID, err := newStepSpanID(runID, runAttempt, jobName, stepName, number) if err != nil { return fmt.Errorf("failed to generate step span ID: %w", err) } span.SetSpanID(spanID) attrs := span.Attributes() attrs.PutStr(semconv.AttributeCicdPipelineTaskName, name) attrs.PutStr(AttributeCICDPipelineTaskRunStatus, step.GetStatus()) span.SetStartTimestamp(pcommon.NewTimestampFromTime(step.GetStartedAt().Time)) span.SetEndTimestamp(pcommon.NewTimestampFromTime(step.GetCompletedAt().Time)) switch strings.ToLower(step.GetConclusion()) { case "success": attrs.PutStr(AttributeCICDPipelineTaskRunStatus, AttributeCICDPipelineTaskRunStatusSuccess) span.Status().SetCode(ptrace.StatusCodeOk) case "failure": attrs.PutStr(AttributeCICDPipelineTaskRunStatus, AttributeCICDPipelineTaskRunStatusFailure) span.Status().SetCode(ptrace.StatusCodeError) case "skipped": attrs.PutStr(AttributeCICDPipelineTaskRunStatus, AttributeCICDPipelineTaskRunStatusFailure) span.Status().SetCode(ptrace.StatusCodeUnset) case "cancelled": attrs.PutStr(AttributeCICDPipelineTaskRunStatus, AttributeCICDPipelineTaskRunStatusCancellation) span.Status().SetCode(ptrace.StatusCodeUnset) default: span.Status().SetCode(ptrace.StatusCodeUnset) } span.Status().SetMessage(event.GetWorkflowJob().GetConclusion()) return nil } // newStepSpanID creates a deterministic Step Span ID based on the provided // inputs. func newStepSpanID(runID int64, runAttempt int, jobName string, stepName string, number int) (pcommon.SpanID, error) { input := fmt.Sprintf("%d%d%s%s%d", runID, runAttempt, jobName, stepName, number) hash := sha256.Sum256([]byte(input)) spanIDHex := hex.EncodeToString(hash[:]) var spanID pcommon.SpanID _, err := hex.Decode(spanID[:], []byte(spanIDHex[16:32])) if err != nil { return pcommon.SpanID{}, err } return spanID, nil }