tooling/templatize/pkg/pipeline/run.go (191 lines of code) (raw):
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/go-logr/logr"
"github.com/Azure/ARO-Tools/pkg/config"
)
var DefaultDeploymentTimeoutSeconds = 30 * 60
func NewPipelineFromFile(pipelineFilePath string, cfg config.Configuration) (*Pipeline, error) {
bytes, err := config.PreprocessFile(pipelineFilePath, cfg)
if err != nil {
return nil, fmt.Errorf("failed to preprocess pipeline file %w", err)
}
err = ValidatePipelineSchema(bytes)
if err != nil {
return nil, fmt.Errorf("failed to validate pipeline schema: %w", err)
}
absPath, err := filepath.Abs(pipelineFilePath)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path for pipeline file %q: %w", pipelineFilePath, err)
}
pipeline, err := NewPlainPipelineFromBytes(absPath, bytes)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal pipeline file %w", err)
}
err = pipeline.Validate()
if err != nil {
return nil, fmt.Errorf("pipeline file failed validation %w", err)
}
return pipeline, nil
}
type PipelineRunOptions struct {
DryRun bool
Step string
Region string
Configuration config.Configuration
SubsciptionLookupFunc subsciptionLookup
NoPersist bool
DeploymentTimeoutSeconds int
}
type armOutput map[string]any
type output interface {
GetValue(key string) (*outPutValue, error)
}
type outPutValue struct {
Type string `yaml:"type"`
Value any `yaml:"value"`
}
func (o armOutput) GetValue(key string) (*outPutValue, error) {
if v, ok := o[key]; ok {
if innerValue, innerConversionOk := v.(map[string]any); innerConversionOk {
returnValue := outPutValue{
Type: innerValue["type"].(string),
Value: innerValue["value"],
}
return &returnValue, nil
}
}
return nil, fmt.Errorf("key %q not found", key)
}
func RunPipeline(pipeline *Pipeline, ctx context.Context, options *PipelineRunOptions) (map[string]output, error) {
logger := logr.FromContextOrDiscard(ctx)
outPuts := make(map[string]output)
// set working directory to the pipeline file directory for the
// duration of the execution so that all commands and file references
// within the pipeline file are resolved relative to the pipeline file
originalDir, err := os.Getwd()
if err != nil {
return nil, err
}
dir := filepath.Dir(pipeline.pipelineFilePath)
logger.V(7).Info("switch current dir to pipeline file directory", "path", dir)
err = os.Chdir(dir)
if err != nil {
return nil, err
}
defer func() {
logger.V(7).Info("switch back dir", "path", originalDir)
err = os.Chdir(originalDir)
if err != nil {
logger.Error(err, "failed to switch back to original directory", "path", originalDir)
}
}()
for _, rg := range pipeline.ResourceGroups {
// prepare execution context
subscriptionID, err := options.SubsciptionLookupFunc(ctx, rg.Subscription)
if err != nil {
return nil, fmt.Errorf("failed to lookup subscription ID for %q: %w", rg.Subscription, err)
}
executionTarget := executionTargetImpl{
subscriptionName: rg.Subscription,
subscriptionID: subscriptionID,
region: options.Region,
resourceGroup: rg.Name,
aksClusterName: rg.AKSCluster,
}
err = RunResourceGroup(rg, ctx, options, &executionTarget, outPuts)
if err != nil {
return nil, err
}
}
return outPuts, nil
}
func RunResourceGroup(rg *ResourceGroup, ctx context.Context, options *PipelineRunOptions, executionTarget ExecutionTarget, outputs map[string]output) error {
logger := logr.FromContextOrDiscard(ctx)
kubeconfigFile, err := executionTarget.KubeConfig(ctx)
if kubeconfigFile != "" {
defer func() {
if err := os.Remove(kubeconfigFile); err != nil {
logger.V(5).Error(err, "failed to delete kubeconfig file", "kubeconfig", kubeconfigFile)
}
}()
} else if err != nil {
return fmt.Errorf("failed to prepare kubeconfig: %w", err)
}
for _, step := range rg.Steps {
// execute
output, err := RunStep(
step,
logr.NewContext(
ctx,
logger.WithValues(
"step", step.StepName(),
"subscription", executionTarget.GetSubscriptionID(),
"resourceGroup", executionTarget.GetResourceGroup(),
"aksCluster", executionTarget.GetAkSClusterName(),
),
),
kubeconfigFile,
executionTarget, options,
outputs,
)
if err != nil {
return err
}
if output != nil {
outputs[step.StepName()] = output
}
}
return nil
}
func RunStep(s Step, ctx context.Context, kubeconfigFile string, executionTarget ExecutionTarget, options *PipelineRunOptions, outPuts map[string]output) (output, error) {
if options.Step != "" && s.StepName() != options.Step {
// skip steps that don't match the specified step name
return nil, nil
}
fmt.Println("\n---------------------")
if options.DryRun {
fmt.Println("This is a dry run!")
}
fmt.Println(s.Description())
fmt.Print("\n")
switch step := s.(type) {
case *ShellStep:
return nil, runShellStep(step, ctx, kubeconfigFile, options, outPuts)
case *ARMStep:
a := newArmClient(executionTarget.GetSubscriptionID(), executionTarget.GetRegion())
if a == nil {
return nil, fmt.Errorf("failed to create ARM client")
}
output, err := a.runArmStep(ctx, options, executionTarget.GetResourceGroup(), step, outPuts)
if err != nil {
return nil, fmt.Errorf("failed to run ARM step: %w", err)
}
return output, nil
default:
fmt.Println("No implementation for action type - skip", s.ActionType())
return nil, nil
}
}
func getInputValues(configuredVariables []Variable, cfg config.Configuration, inputs map[string]output) (map[string]any, error) {
values := make(map[string]any)
for _, i := range configuredVariables {
if i.Input != nil {
if v, found := inputs[i.Input.Step]; found {
value, err := v.GetValue(i.Input.Name)
if err != nil {
return nil, fmt.Errorf("failed to get value for input %s.%s: %w", i.Input.Step, i.Input.Name, err)
}
values[i.Name] = value.Value
} else {
return nil, fmt.Errorf("step %s not found in provided outputs", i.Input.Step)
}
} else if i.ConfigRef != "" {
value, found := cfg.GetByPath(i.ConfigRef)
if !found {
return nil, fmt.Errorf("failed to lookup config reference %s for %s", i.ConfigRef, i.Name)
}
values[i.Name] = value
} else {
values[i.Name] = i.Value
}
}
return values, nil
}