pkg/model/composer.go (116 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
"gopkg.in/yaml.v3"
)
type Tistate string
const (
// ref: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances
TASKINSTANCE_NONE Tistate = "none"
TASKINSTANCE_SCHEDULED Tistate = "scheduled"
TASKINSTANCE_QUEUED Tistate = "queued"
TASKINSTANCE_RUNNING Tistate = "running"
TASKINSTANCE_SUCCESS Tistate = "success"
TASKINSTANCE_SHUTDOWN Tistate = "shutdown"
TASKINSTANCE_RESTARTING Tistate = "restarting"
TASKINSTANCE_FAILED Tistate = "failed"
TASKINSTANCE_SKIPPED Tistate = "skipped"
TASKINSTANCE_UP_FOR_RETRY Tistate = "up_for_retry"
TASKINSTANCE_DEFERRED Tistate = "deferred"
TASKINSTANCE_UP_FOR_RESCHEDULE Tistate = "up_for_reschedule"
TASKINSTANCE_REMOVED Tistate = "removed"
TASKINSTANCE_UPSTREAM_FAILED Tistate = "upstream_failed"
// Original States //
// Zombie status for KHI view
TASKINSTANCE_ZOMBIE Tistate = "zombie"
)
// ref: https://github.com/apache/airflow/blob/main/airflow/models/taskinstance.py#L1187
type AirflowTaskInstance struct {
dagId string // primary key
taskId string // primary key
runId string // primary key
mapIndex string // primary key
host string
status Tistate
}
func NewAirflowTaskInstance(dagId string, taskId string, runId string, mapIndex string, host string, status Tistate) *AirflowTaskInstance {
return &AirflowTaskInstance{
dagId: dagId,
taskId: taskId,
runId: runId,
mapIndex: mapIndex,
host: host,
status: status,
}
}
func (a *AirflowTaskInstance) DagId() string {
return a.dagId
}
func (a *AirflowTaskInstance) TaskId() string {
return a.taskId
}
func (a *AirflowTaskInstance) RunId() string {
return a.runId
}
func (a *AirflowTaskInstance) MapIndex() string {
return a.mapIndex
}
func (a *AirflowTaskInstance) Host() string {
return a.host
}
func (a *AirflowTaskInstance) Status() Tistate {
return a.status
}
func (a *AirflowTaskInstance) ToYaml() string {
b, err := yaml.Marshal(a)
if err != nil {
return ""
}
return string(b)
}
type AirflowWorker struct {
host string
}
func NewAirflowWorker(host string) *AirflowWorker {
return &AirflowWorker{
host: host,
}
}
func (a *AirflowWorker) Host() string {
return a.host
}
func (a *AirflowWorker) ToYaml() string {
b, err := yaml.Marshal(a)
if err != nil {
return ""
}
return string(b)
}
type DagFileProcessorStats struct {
dagFilePath string `yaml:"dagFilePath"`
runtime string `yaml:"runtime"`
numberOfDags string `yaml:"numberOfDags"`
numberOfErrors string `yaml:"numberOfErrors"`
}
func NewDagFileProcessorStats(dagFilePath string, runtime string, numberOfDags string, numberOfErrors string) *DagFileProcessorStats {
return &DagFileProcessorStats{
dagFilePath: dagFilePath,
runtime: runtime,
numberOfDags: numberOfDags,
numberOfErrors: numberOfErrors,
}
}
func (s *DagFileProcessorStats) ToYaml() string {
b, err := yaml.Marshal(s)
if err != nil {
return ""
}
return string(b)
}
func (s *DagFileProcessorStats) DagFilePath() string {
return s.dagFilePath
}
func (s *DagFileProcessorStats) Runtime() string {
return s.runtime
}
func (s *DagFileProcessorStats) NumberOfDags() string {
return s.numberOfDags
}
func (s *DagFileProcessorStats) NumberOfErrors() string {
return s.numberOfErrors
}