pkg/source/apache-airflow/dag-processor-manager/parser.go (139 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apacheairflow import ( "context" "fmt" "strings" "github.com/GoogleCloudPlatform/khi/pkg/log" "github.com/GoogleCloudPlatform/khi/pkg/model" "github.com/GoogleCloudPlatform/khi/pkg/model/enum" "github.com/GoogleCloudPlatform/khi/pkg/model/history" "github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper" "github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath" "github.com/GoogleCloudPlatform/khi/pkg/parser" "github.com/GoogleCloudPlatform/khi/pkg/task/taskid" ) type AirflowDagProcessorParser struct { dagFilePath string logTask taskid.TaskReference[[]*log.LogEntity] targetLogType enum.LogType } func NewAirflowDagProcessorParser(dagFilePath string, logTask taskid.TaskReference[[]*log.LogEntity], targetLogType enum.LogType) *AirflowDagProcessorParser { return &AirflowDagProcessorParser{ dagFilePath: dagFilePath, logTask: logTask, targetLogType: targetLogType, } } // TargetLogType implements parser.Parser. func (a *AirflowDagProcessorParser) TargetLogType() enum.LogType { return a.targetLogType } var _ parser.Parser = (*AirflowDagProcessorParser)(nil) func (*AirflowDagProcessorParser) Dependencies() []taskid.UntypedTaskReference { return []taskid.UntypedTaskReference{} } func (*AirflowDagProcessorParser) Description() string { return "The DagProcessorManager logs contain information for investigating the number of DAGs included in each Python file and the time it took to parse them. You can get information about missing DAGs and load." } func (*AirflowDagProcessorParser) GetParserName() string { return "Airflow DagProcessorManager" } // Grouper implements parser.Parser. func (*AirflowDagProcessorParser) Grouper() grouper.LogGrouper { return grouper.AllDependentLogGrouper } func (a *AirflowDagProcessorParser) LogTask() taskid.TaskReference[[]*log.LogEntity] { return a.logTask } func (a *AirflowDagProcessorParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error { textPayload, _ := l.MainMessage() dagFileProcessorStats := a.fromLogEntity(textPayload) if dagFileProcessorStats == nil { // this is not a dag file processor stats log, skip return nil } cs.RecordRevision(resourcepath.DagFileProcessorStats(dagFileProcessorStats), &history.StagingResourceRevision{ Verb: enum.RevisionVerbComposerTaskInstanceStats, State: enum.RevisionStateConditionTrue, Requestor: "dag-processor-manager", ChangeTime: l.Timestamp(), Partial: false, Body: fmt.Sprintf("dags: %s\nerrors: %s", dagFileProcessorStats.NumberOfDags(), dagFileProcessorStats.NumberOfErrors()), }) // Emphasize "Error" for parsing dag failures if dagFileProcessorStats.NumberOfErrors() != "0" { cs.RecordLogSeverity(enum.SeverityError) } var summary string if dagFileProcessorStats.Runtime() != "" { summary = fmt.Sprintf("dags=%s, errors=%s, runtime=%s", dagFileProcessorStats.NumberOfDags(), dagFileProcessorStats.NumberOfErrors(), dagFileProcessorStats.Runtime()) } else { summary = fmt.Sprintf("dags=%s, errors= %s", dagFileProcessorStats.NumberOfDags(), dagFileProcessorStats.NumberOfErrors()) } cs.RecordLogSummary(summary) return nil } // parse DAG Processor Manager's parse result log. // Sample: /home/airflow/gcs/dags/main.py 40441 4.06s 64 0 6.93s 2024-05-02T05:14:54 func (a *AirflowDagProcessorParser) fromLogEntity(log string) *model.DagFileProcessorStats { // TODO add support for `last_num_of_db_queries` (available from 2.10) // The current implementation is based on a fixed number of 6 columns, but this will change to 7 fom 2.10. // the implementation needs to support both 6 and 7 columns. Considering future updates, it would be advisable to make the number of columns dynamically adjustable. // Fortunately DagProcessorManhager also outputs the column headers to the log. This hint can make adjustments. // remove a string "DAG_PROCESSOR_MANAGER_LOG:" from the string(Cloud Composer 3 support) log = strings.TrimPrefix(log, "DAG_PROCESSOR_MANAGER_LOG:") // devide the string with " ". var fragmentation []string for _, s := range strings.Split(log, " ") { if s != "" { fragmentation = append(fragmentation, s) } } validate := func(f []string) bool { // according to the source code, the number of output can be 3, 4, 5, 6, 7 // https://github.com/apache/airflow/blob/2.7.3/airflow/dag_processing/manager.py#L866 // case 3 = can happen(file_path, num_dags, num_errors) // case 4 = can happen(file_path, num_dags, num_errors, pid or runtime) // case 5 = it's a major pattern(file_path, num_dags, num_errors, last_runtime, last_run) // case 6 = can happen(file_path, num_dags, num_errors, last_runtime, last_run, pid or runtime) // case 7 = it's a major pattern(all) if len(f) < 2 || len(f) > 7 { return false } if !strings.HasPrefix(f[0], a.dagFilePath) { return false } return true } if !validate(fragmentation) { return nil } return func(frags []string) *model.DagFileProcessorStats { filePath := frags[0] var runtime, numberOfDags, numberOfErrors string // runtime and last_runtime must contain "s" // ref: https://github.com/apache/airflow/blob/2.7.3/airflow/dag_processing/manager.py#L870 isRuntime := func(s string) bool { return strings.Contains(s, "s") } switch len(frags) { // the length must be between 3~7(inclusive) case 3: // FILE_PATH DAG ERROR numberOfDags, numberOfErrors = frags[1], frags[2] case 4: guess := frags[1] if isRuntime(guess) { // FILE_PATH RUNTIME DAG ERROR runtime = frags[1] } numberOfDags, numberOfErrors = frags[2], frags[3] case 5: guess := frags[2] // FILE_PATH PID RUNTIME DAG ERROR if isRuntime(guess) { runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4] } else { // FILE_PATH DAG ERROR LAST_RUNTIME LAST_RUN numberOfDags, numberOfErrors = frags[1], frags[2] } case 6: // FILE_PATH RUNTIME DAG ERROR LAST_RUNTIME LAST_RUN guess := frags[1] if isRuntime(guess) { runtime, numberOfDags, numberOfErrors = frags[1], frags[2], frags[3] break } // FILE_PATH PID RUNTIME DAG ERROR LAST_RUNTIME/LAST_RUN // or // FILE_PATH PID DAG ERROR LAST_RUNTIME LAST_RUN guess = frags[2] if isRuntime(guess) { runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4] break } numberOfDags, numberOfErrors = frags[2], frags[3] case 7: // FILE_PATH PID RUNTIME DAG ERROR LAST_RUNTIME LAST_RUN runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4] } return model.NewDagFileProcessorStats( filePath, runtime, numberOfDags, numberOfErrors, ) }(fragmentation) }