pkg/source/apache-airflow/airflow-worker/parser.go (144 lines of code) (raw):
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apacheairflow
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/GoogleCloudPlatform/khi/pkg/log"
"github.com/GoogleCloudPlatform/khi/pkg/model"
"github.com/GoogleCloudPlatform/khi/pkg/model/enum"
"github.com/GoogleCloudPlatform/khi/pkg/model/history"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/grouper"
"github.com/GoogleCloudPlatform/khi/pkg/model/history/resourcepath"
"github.com/GoogleCloudPlatform/khi/pkg/parser"
apacheairflow "github.com/GoogleCloudPlatform/khi/pkg/source/apache-airflow"
"github.com/GoogleCloudPlatform/khi/pkg/task/taskid"
)
var (
// Running <TaskInstance: DAG_ID.TASK_ID RUN_ID [STATE]> on host WORKER
// ref: https://github.com/apache/airflow/blob/2.7.3/airflow/cli/commands/task_command.py#L416
// airflowWorkerRunningHostTemplate = regexp.MustCompile(`Running <TaskInstance:\s(?P<dagid>\S+)\.(?P<taskid>\S+)\s(?P<runid>\S+)\s(?:map_index=(?P<mapIndex>\d+)\s)?\[(?P<state>\w+)\]> on host (?P<host>.+)`)
airflowWorkerRunningHostTemplate = regexp.MustCompile(`Running <TaskInstance:\s(?P<dagid>\w+)\.(?P<taskid>[\w.-]+)\s(?P<runid>\S+)\s(?:map_index=(?P<mapIndex>\d+)\s)?\[(?P<state>\w+)\]> on host (?P<host>.+)`)
// Marking task as STATE. dag_id=DAG_ID, task_id=TASK_ID, run_id=RUN_ID, map_index=MAP_INDEX, execution_date=..., start_date=..., end_date=...
// ref: https://github.com/apache/airflow/blob/2.9.3/airflow/models/taskinstance.py#L1201
airflowWorkerMarkingStatusTemplate = regexp.MustCompile(`.*Marking task as\s(?P<state>\S+).\sdag_id=(?P<dagid>\S+),\stask_id=(?P<taskid>\S+),\srun_id=(?P<runid>\S+),\s(map_index=(?P<mapIndex>\d+),\s)?.+`)
)
// Parse airflow-scheduler logs and make them into TaskInstances.
// This parser will detect these lifecycles;
// - running
var _ parser.Parser = &AirflowWorkerParser{}
type AirflowWorkerParser struct {
queryTaskId taskid.TaskReference[[]*log.LogEntity]
targetLogType enum.LogType
}
func NewAirflowWorkerParser(queryTaskId taskid.TaskReference[[]*log.LogEntity], targetLogType enum.LogType) *AirflowWorkerParser {
return &AirflowWorkerParser{
queryTaskId: queryTaskId,
targetLogType: targetLogType,
}
}
// TargetLogType implements parser.Parser.
func (a *AirflowWorkerParser) TargetLogType() enum.LogType {
return a.targetLogType
}
// Dependencies implements parser.Parser.
func (*AirflowWorkerParser) Dependencies() []taskid.UntypedTaskReference {
return []taskid.UntypedTaskReference{}
}
// DependsOnPast implements parser.Parser.
func (*AirflowWorkerParser) Grouper() grouper.LogGrouper {
return grouper.AllDependentLogGrouper
}
// Description implements parser.Parser.
func (*AirflowWorkerParser) Description() string {
return `Airflow Worker logs contain information related to the execution of TaskInstances. By including these logs, you can gain insights into where and how each TaskInstance was executed.`
}
// GetParserName implements parser.Parser.
func (*AirflowWorkerParser) GetParserName() string {
return "Airflow Worker"
}
// LogTask implements parser.Parser.
func (a *AirflowWorkerParser) LogTask() taskid.TaskReference[[]*log.LogEntity] {
return a.queryTaskId
}
// Parse implements parser.Parser.
func (*AirflowWorkerParser) Parse(ctx context.Context, l *log.LogEntity, cs *history.ChangeSet, builder *history.Builder) error {
parsers := []airflowParserFn{
&airflowWorkerRunningHostFn{},
&airflowWorkerMarkingStatusFn{},
}
host, _ := l.GetString("labels.worker_id")
worker := model.NewAirflowWorker(host)
cs.RecordEvent(resourcepath.AirflowWorker(worker))
summary, _ := l.MainMessage()
cs.RecordLogSummary(summary)
for _, p := range parsers {
ti, err := p.fn(l)
if err != nil {
continue
}
r := resourcepath.AirflowTaskInstance(ti)
verb, state := apacheairflow.TiStatusToVerb(ti)
cs.RecordRevision(r, &history.StagingResourceRevision{
Verb: verb,
State: state,
Requestor: "airflow-worker",
ChangeTime: l.Timestamp(),
Partial: false,
Body: ti.ToYaml(),
})
}
return nil
}
// This fn publishes a Running state of Ti on airflow-worker
type airflowWorkerRunningHostFn struct{}
var _ airflowParserFn = (*airflowWorkerRunningHostFn)(nil)
func (fn *airflowWorkerRunningHostFn) fn(inputLog *log.LogEntity) (*model.AirflowTaskInstance, error) {
textPayload, err := inputLog.MainMessage()
if err != nil {
return nil, fmt.Errorf("textPayload not found. maybe invalid log. please confirm the log %s", inputLog.ID())
}
// if textPayload does not start from "Running ...", return nil error
// this early return is for parformance(regex is too slow)
if !strings.HasPrefix(textPayload, "Running ") {
return nil, fmt.Errorf("this log entity is not for TaskInstance lifecycle. abort")
}
var taskInstance *model.AirflowTaskInstance
matches := airflowWorkerRunningHostTemplate.FindStringSubmatch(textPayload)
if matches == nil {
return nil, fmt.Errorf("this log entity is not for TaskInstance lifecycle. abort")
}
dagid := matches[airflowWorkerRunningHostTemplate.SubexpIndex("dagid")]
taskid := matches[airflowWorkerRunningHostTemplate.SubexpIndex("taskid")]
runid := matches[airflowWorkerRunningHostTemplate.SubexpIndex("runid")]
host := matches[airflowWorkerRunningHostTemplate.SubexpIndex("host")]
stateStr := matches[airflowWorkerRunningHostTemplate.SubexpIndex("state")] // Renamed original string variable
state, err := apacheairflow.StringToTiState(stateStr)
if err != nil {
// Log or handle the error appropriately if the state string is unknown.
fmt.Printf("Warning: Could not convert Airflow state '%s' to Tistate: %v. Skipping log entry.\n", stateStr, err)
return nil, err // Return error to skip processing this log entry
}
mapIndex := "-1" // optional, applied for only Dynamic DAG.
if matches[airflowWorkerRunningHostTemplate.SubexpIndex("mapIndex")] != "" {
mapIndex = matches[airflowWorkerRunningHostTemplate.SubexpIndex("mapIndex")]
}
taskInstance = model.NewAirflowTaskInstance(dagid, taskid, runid, mapIndex, host, state)
return taskInstance, nil
}
// This fn publish the final state of Ti on airflow-worker
type airflowWorkerMarkingStatusFn struct{}
var _ airflowParserFn = (*airflowWorkerMarkingStatusFn)(nil)
func (fn *airflowWorkerMarkingStatusFn) fn(inputLog *log.LogEntity) (*model.AirflowTaskInstance, error) {
textPayload, err := inputLog.MainMessage()
if err != nil {
return nil, fmt.Errorf("textPayload not found. maybe invalid log. please confirm the log %s", inputLog.ID())
}
var taskInstance *model.AirflowTaskInstance
matches := airflowWorkerMarkingStatusTemplate.FindStringSubmatch(textPayload)
if matches == nil {
return nil, fmt.Errorf("this entity is not for TaskInstance lifecycle. abort")
}
workerId, err := inputLog.GetString("labels.worker_id") // TODO remove Cloud Logging Dependency
if err != nil {
return nil, fmt.Errorf("worker_id not found. maybe invalid log. please confirm the log %s", inputLog.ID())
}
dagid := matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("dagid")]
taskid := matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("taskid")]
runid := matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("runid")]
// ref: https://github.com/apache/airflow/blob/2.7.3/airflow/models/taskinstance.py#L1392
state := strings.ToLower(matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("state")])
// runid := matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("runid")]
mapIndex := "-1" // optional, applied for only Dynamic DAG.
if matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("mapIndex")] != "" {
mapIndex = matches[airflowWorkerMarkingStatusTemplate.SubexpIndex("mapIndex")]
}
// Convert the string state to the required model.Tistate type
tiState, err := apacheairflow.StringToTiState(state)
if err != nil {
// Log or handle the error appropriately if the state string is unknown.
fmt.Printf("Warning: Could not convert Airflow state '%s' to Tistate: %v. Skipping log entry.\n", state, err)
return nil, err // Return error to skip processing this log entry
}
taskInstance = model.NewAirflowTaskInstance(dagid, taskid, runid, mapIndex, workerId, tiState)
return taskInstance, nil
}
// airflowParserFn is in charge of "Parse a airflow log, and create a TaskInstance object".
// this interface is for internal
type airflowParserFn interface {
// fn must return non-nil AirflowTaskInstance if the inputLog indicates a task instance.
// if there are any errors(i.e textPayload not found), please return nil as AirflowTaskInstance.
fn(inputLog *log.LogEntity) (*model.AirflowTaskInstance, error)
}