in pkg/source/apache-airflow/dag-processor-manager/parser.go [111:212]
func (a *AirflowDagProcessorParser) fromLogEntity(log string) *model.DagFileProcessorStats {
// TODO add support for `last_num_of_db_queries` (available from 2.10)
// The current implementation is based on a fixed number of 6 columns, but this will change to 7 fom 2.10.
// the implementation needs to support both 6 and 7 columns. Considering future updates, it would be advisable to make the number of columns dynamically adjustable.
// Fortunately DagProcessorManhager also outputs the column headers to the log. This hint can make adjustments.
// remove a string "DAG_PROCESSOR_MANAGER_LOG:" from the string(Cloud Composer 3 support)
log = strings.TrimPrefix(log, "DAG_PROCESSOR_MANAGER_LOG:")
// devide the string with " ".
var fragmentation []string
for _, s := range strings.Split(log, " ") {
if s != "" {
fragmentation = append(fragmentation, s)
}
}
validate := func(f []string) bool {
// according to the source code, the number of output can be 3, 4, 5, 6, 7
// https://github.com/apache/airflow/blob/2.7.3/airflow/dag_processing/manager.py#L866
// case 3 = can happen(file_path, num_dags, num_errors)
// case 4 = can happen(file_path, num_dags, num_errors, pid or runtime)
// case 5 = it's a major pattern(file_path, num_dags, num_errors, last_runtime, last_run)
// case 6 = can happen(file_path, num_dags, num_errors, last_runtime, last_run, pid or runtime)
// case 7 = it's a major pattern(all)
if len(f) < 2 || len(f) > 7 {
return false
}
if !strings.HasPrefix(f[0], a.dagFilePath) {
return false
}
return true
}
if !validate(fragmentation) {
return nil
}
return func(frags []string) *model.DagFileProcessorStats {
filePath := frags[0]
var runtime, numberOfDags, numberOfErrors string
// runtime and last_runtime must contain "s"
// ref: https://github.com/apache/airflow/blob/2.7.3/airflow/dag_processing/manager.py#L870
isRuntime := func(s string) bool {
return strings.Contains(s, "s")
}
switch len(frags) { // the length must be between 3~7(inclusive)
case 3:
// FILE_PATH DAG ERROR
numberOfDags, numberOfErrors = frags[1], frags[2]
case 4:
guess := frags[1]
if isRuntime(guess) {
// FILE_PATH RUNTIME DAG ERROR
runtime = frags[1]
}
numberOfDags, numberOfErrors = frags[2], frags[3]
case 5:
guess := frags[2]
// FILE_PATH PID RUNTIME DAG ERROR
if isRuntime(guess) {
runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4]
} else { // FILE_PATH DAG ERROR LAST_RUNTIME LAST_RUN
numberOfDags, numberOfErrors = frags[1], frags[2]
}
case 6:
// FILE_PATH RUNTIME DAG ERROR LAST_RUNTIME LAST_RUN
guess := frags[1]
if isRuntime(guess) {
runtime, numberOfDags, numberOfErrors = frags[1], frags[2], frags[3]
break
}
// FILE_PATH PID RUNTIME DAG ERROR LAST_RUNTIME/LAST_RUN
// or
// FILE_PATH PID DAG ERROR LAST_RUNTIME LAST_RUN
guess = frags[2]
if isRuntime(guess) {
runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4]
break
}
numberOfDags, numberOfErrors = frags[2], frags[3]
case 7:
// FILE_PATH PID RUNTIME DAG ERROR LAST_RUNTIME LAST_RUN
runtime, numberOfDags, numberOfErrors = frags[2], frags[3], frags[4]
}
return model.NewDagFileProcessorStats(
filePath,
runtime,
numberOfDags,
numberOfErrors,
)
}(fragmentation)
}