pkg/source/apache-airflow/airflow-scheduler/parser.go (115 lines of code) (raw):

// Copyright 2025 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package apacheairflow import ( "context" "fmt" "regexp" "github.com/GoogleCloudPlatform/khi/pkg/log" "github.com/GoogleCloudPlatform/khi/pkg/model" "github.com/GoogleCloudPlatform/khi/pkg/model/enum" "github.com/GoogleCloudPlatform/khi/pkg/model/history" "github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper" "github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath" "github.com/GoogleCloudPlatform/khi/pkg/parser" airflow "github.com/GoogleCloudPlatform/khi/pkg/source/apache-airflow" "github.com/GoogleCloudPlatform/khi/pkg/task/taskid" ) // Regex templates to parse Airflow log format var ( // \t<TaskInstance: $DAGID.$TASKID $RUNID map_index=$MAPINDEX [scheduled]> // ref: https://github.com/apache/airflow/blob/2.7.3/airflow/models/taskinstance.py#L1179 airflowTiTemplate = regexp.MustCompile(`\s<TaskInstance:\s(?P<dagid>\w+)\.(?P<taskid>[\w.-]+)\s(?P<runid>\S+)\s(?:map_index=(?P<mapIndex>\d+)\s)?\[(?P<state>\w+)\]>`) // TODO Add log types // * Trying to enqueue tasks: [<TaskInstance: airflow_monitoring.echo scheduled__2025-04-10T04:00:00+00:00 [scheduled]>] for executor: CeleryExecutor(parallelism=0) (ONLY appliucable from 2.10.x) // * Sending TaskInstanceKey(dag_id='airflow_monitoring', task_id='echo', run_id='scheduled__2025-04-10T04:00:00+00:00', try_number=1, map_index=-1) to CeleryExecutor with priority 2147483647 and queue default // * Adding to queue: ['airflow', 'tasks', 'run', 'airflow_monitoring', 'echo', 'scheduled__2025-04-10T04:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/airflow_monitoring.py'] // Received executor event with state queued for task instance TaskInstanceKey(dag_id='khi_dag', task_id='add_one', run_id='scheduled__2023-11-30T05:00:00+00:00', try_number=1, map_index=0) // ref: https://github.com/apache/airflow/blob/2.7.3/airflow/jobs/scheduler_job_runner.py#L685 airflowSchedulerReceivedEventTemplate = regexp.MustCompile(`Received executor event with state (?P<state>.+) for task instance TaskInstanceKey\(dag_id='(?P<dagid>.+)', task_id='(?P<taskid>.+)', run_id='(?P<runid>.+)',.*map_index=(?P<mapIndex>\d+)\)`) // TODO Add other log types // * Setting external_id for <TaskInstance: airflow_monitoring.echo scheduled__2025-04-10T04:00:00+00:00 [queued]> to cf33ab13-b638-4abb-8484-9faf4cc19345 // * Marking run <DagRun airflow_monitoring @ 2025-04-10 04:00:00+00:00: scheduled__2025-04-10T04:00:00+00:00, state:running, queued_at: 2025-04-10 04:10:00.679237+00:00. externally triggered: False> successful // TaskInstance Finished: dag_id=DAGID, task_id=TASKID, run_id=RUNID, map_index=MAPINDEX, ..., state=STATE ... // ref: https://github.com/apache/airflow/blob/2.7.3/airflow/jobs/scheduler_job_runner.py#L715 airflowSchedulerTaskFinishedTemplate = regexp.MustCompile(`TaskInstance Finished:\s+dag_id=(?P<dagid>\S+),\s+task_id=(?P<taskid>\S+),\s+run_id=(?P<runid>\S+),\s+map_index=(?P<mapIndex>\S+),\s+.*?state=(?P<state>\S+)(?:,\s+executor=.+?)?,\s+executor_state.+`) // TODO Add other log types // * Received executor event with state success for task instance TaskInstanceKey(dag_id='airflow_monitoring', task_id='echo', run_id='scheduled__2025-04-10T04:00:00+00:00', try_number=1, map_index=-1) // Detected zombie job: {'full_filepath': '...', 'processor_subdir': '...', 'msg': "{'DAG Id': 'DAG_ID', 'Task Id': 'TASK_ID', 'Run Id': 'RUN_ID', 'Hostname': 'WORKER', ... // ref: https://github.com/apache/airflow/blob/2.7.3/airflow/jobs/scheduler_job_runner.py#L1746C55-L1746C62 airflowSchedulerZombieDetectedTemplate = regexp.MustCompile(`'DAG Id':\s*'(?P<dagid>[^']+)',\s*'Task Id':\s*'(?P<taskid>[^']+)',\s*'Run Id':\s*'(?P<runid>[^']+)',\s*('Map Index':\s*'(?P<mapIndex>[^']+)',\s*)?'Hostname':\s*'(?P<host>[^']+)'`) ) // Parse airflow-scheduler logs and make them into TaskInstances. // This parser will detect these lifecycles; // - scheduled // - queued // - success // - failed type AirflowSchedulerParser struct { queryTaskId taskid.TaskReference[[]*log.LogEntity] targetLogType enum.LogType } func NewAirflowSchedulerParser(queryTaskId taskid.TaskReference[[]*log.LogEntity], targetLogType enum.LogType) *AirflowSchedulerParser { return &AirflowSchedulerParser{queryTaskId: queryTaskId, targetLogType: targetLogType} } // TargetLogType implements parser.Parser. func (t *AirflowSchedulerParser) TargetLogType() enum.LogType { return t.targetLogType } var _ parser.Parser = &AirflowSchedulerParser{} func (*AirflowSchedulerParser) Dependencies() []taskid.UntypedTaskReference { return []taskid.UntypedTaskReference{} } func (*AirflowSchedulerParser) Grouper() grouper.LogGrouper { return grouper.AllDependentLogGrouper } func (*AirflowSchedulerParser) Description() string { return `Airflow Scheduler logs contain information related to the scheduling of TaskInstances, making it an ideal source for understanding the lifecycle of TaskInstances.` } func (*AirflowSchedulerParser) GetParserName() string { return "Airflow Scheduler" } func (a *AirflowSchedulerParser) LogTask() taskid.TaskReference[[]*log.LogEntity] { return a.queryTaskId } func (t *AirflowSchedulerParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error { ti, err := t.parseInternal(l) if err != nil { return err } if ti == nil { // not found return nil } resourcePath := resourcepath.AirflowTaskInstance(ti) verb, state := airflow.TiStatusToVerb(ti) cs.RecordRevision(resourcePath, &history.StagingResourceRevision{ Verb: verb, State: state, Requestor: "airflow-scheduler", ChangeTime: l.Timestamp(), Partial: false, Body: ti.ToYaml(), }) summary, _ := l.MainMessage() cs.RecordLogSummary(summary) cs.RecordEvent(resourcePath) return nil } // parseInternal generates AirflowTaskInstance from the logEntity. // If the log does not contain information about ti, parseInternal throw non-nil error. func (t *AirflowSchedulerParser) parseInternal(l *log.LogEntity) (*model.AirflowTaskInstance, error) { // TODO since all templates can generate same parametors(dagid,taskid,runid,state,mapIndex), I don't create `airflowParserFn`s for each template. // TODO create a generic airflowParserFn which generate ti from a simple template. template := []*regexp.Regexp{ airflowTiTemplate, airflowSchedulerReceivedEventTemplate, airflowSchedulerTaskFinishedTemplate, } textPayload, err := l.GetString("textPayload") if err != nil { return nil, fmt.Errorf("textPayload not found. maybe this is not airflow log. please confirm the log. ID: %s", l.ID()) } // iterates through a list of regular expressions to match the log entity against. for _, re := range template { // If the log entity matches one of the regular expressions, // the function extracts the following information from the log message matches := re.FindStringSubmatch(textPayload) if matches == nil { continue } dagid := matches[re.SubexpIndex("dagid")] taskid := matches[re.SubexpIndex("taskid")] runid := matches[re.SubexpIndex("runid")] stateStr := matches[re.SubexpIndex("state")] // Renamed original string variable mapIndex := "-1" // optional, applied for only Dynamic DAG. if matches[re.SubexpIndex("mapIndex")] != "" { mapIndex = matches[re.SubexpIndex("mapIndex")] } state, err := airflow.StringToTiState(stateStr) if err != nil { fmt.Printf("Warning: Could not convert Airflow state '%s' to Tistate: %v. Skipping log entry.\n", stateStr, err) continue } return model.NewAirflowTaskInstance(dagid, taskid, runid, mapIndex, "", state), nil } matches := airflowSchedulerZombieDetectedTemplate.FindStringSubmatch(textPayload) if matches == nil { // this log entity is not for TaskInstance lifecycle. return nil, nil } dagid := matches[airflowSchedulerZombieDetectedTemplate.SubexpIndex("dagid")] taskid := matches[airflowSchedulerZombieDetectedTemplate.SubexpIndex("taskid")] runid := matches[airflowSchedulerZombieDetectedTemplate.SubexpIndex("runid")] state := model.TASKINSTANCE_ZOMBIE host := matches[airflowSchedulerZombieDetectedTemplate.SubexpIndex("host")] mapIndex := "-1" if i := matches[airflowSchedulerZombieDetectedTemplate.SubexpIndex("mapIndex")]; i != "" { mapIndex = i } return model.NewAirflowTaskInstance(dagid, taskid, runid, mapIndex, host, state), nil }