custom-targets/vertex-ai-pipeline/pipeline-deployer/render.go (134 lines of code) (raw):

// Copyright 2023 Google LLC // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // https://www.apache.org/licenses/LICENSE-2.0 // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "encoding/json" "fmt" "os" "cloud.google.com/go/storage" "github.com/GoogleCloudPlatform/cloud-deploy-samples/custom-targets/util/applysetters" "github.com/GoogleCloudPlatform/cloud-deploy-samples/custom-targets/util/clouddeploy" "google.golang.org/api/aiplatform/v1" "sigs.k8s.io/yaml" ) const ( // The default place to look for a pipelineJob configuration file if a specific location is not specified defaultConfigPath = "/workspace/source/pipelineJob.yaml" // Path to use when downloading the source input archive file. srcArchivePath = "/workspace/archive.tgz" // Path to use when unarchiving the source input. srcPath = "/workspace/source" ) // renderer implements the handler interface for performing a render. type renderer struct { gcsClient *storage.Client aiPlatformService *aiplatform.Service params *params req *clouddeploy.RenderRequest } // process processes the Render params by generating the YAML representation of a // CreatePipelineJobRequest object. func (r *renderer) process(ctx context.Context) error { fmt.Println("Processing render request") res, err := r.render(ctx) if err != nil { fmt.Printf("Render failed: %v\n", err) res := &clouddeploy.RenderResult{ ResultStatus: clouddeploy.RenderFailed, FailureMessage: err.Error(), } r.addCommonMetadata(res) fmt.Println("Uploading failed render results") rURI, err := r.req.UploadResult(ctx, r.gcsClient, res) if err != nil { return fmt.Errorf("error uploading failed render results: %v", err) } fmt.Printf("Uploaded failed render results to %s\n", rURI) return err } r.addCommonMetadata(res) fmt.Println("Uploading successful render results") rURI, err := r.req.UploadResult(ctx, r.gcsClient, res) if err != nil { return fmt.Errorf("error uploading render results: %v", err) } fmt.Printf("Uploaded render results to %s\n", rURI) return nil } func (r *renderer) render(ctx context.Context) (*clouddeploy.RenderResult, error) { fmt.Printf("Downloading render input archive to %s and unarchiving to %s\n", srcArchivePath, srcPath) inURI, err := r.req.DownloadAndUnarchiveInput(ctx, r.gcsClient, srcArchivePath, srcPath) if err != nil { return nil, fmt.Errorf("unable to download and unarchive render input: %v", err) } fmt.Printf("Downloaded render input archive from %s\n", inURI) out, err := r.renderCreatePipelineRequest() if err != nil { return nil, fmt.Errorf("error rendering createPipelineJobRequest params: %v", err) } fmt.Printf("Uploading deployed pipeline manifest.\n") mURI, err := r.req.UploadArtifact(ctx, r.gcsClient, "manifest.yaml", &clouddeploy.GCSUploadContent{Data: out}) if err != nil { return nil, fmt.Errorf("error uploading createPipelineJobRequest manifest: %v", err) } fmt.Printf("Uploaded createPipelineJobRequest manifest to %s\n", mURI) return &clouddeploy.RenderResult{ ResultStatus: clouddeploy.RenderSucceeded, ManifestFile: mURI, }, nil } // renderCreatePipelineRequest generates a CreatePipelineJobRequest object and returns its definition as a yaml-formatted string func (r *renderer) renderCreatePipelineRequest() ([]byte, error) { if err := applyDeployParams(r.params.configPath); err != nil { return nil, fmt.Errorf("cannot apply deploy parameters to configuration file: %v", err) } configuration, err := loadConfigurationFile(r.params.configPath) if err != nil { return nil, fmt.Errorf("unable to obtain configuration data: %v", err) } // blank pipelineJob template pipelineJob := &aiplatform.GoogleCloudAiplatformV1PipelineJob{} if err = yaml.Unmarshal(configuration, pipelineJob); err != nil { return nil, fmt.Errorf("unable to parse configuration data into pipelineJob object: %v", err) } paramValues := r.params.pipelineParams if pipelineJob.TemplateUri == "" { pipelineJob.TemplateUri = r.params.pipeline } if pipelineJob.DisplayName == "" { pipelineJob.DisplayName = paramValues["model_display_name"] } paramValues["project_id"] = r.params.project paramString, err := json.Marshal(paramValues) if err != nil { fmt.Printf("Error marshalling JSON: %s", err) return nil, fmt.Errorf("unable to marshal params json") } pipelineJob.RuntimeConfig.ParameterValues = paramString request := &aiplatform.GoogleCloudAiplatformV1CreatePipelineJobRequest{PipelineJob: pipelineJob} return yaml.Marshal(request) } // addCommonMetadata inserts metadata into the render result that should be present // regardless of render success or failure. func (r *renderer) addCommonMetadata(rs *clouddeploy.RenderResult) { if rs.Metadata == nil { rs.Metadata = map[string]string{} } rs.Metadata[clouddeploy.CustomTargetSourceMetadataKey] = aiDeployerSampleName rs.Metadata[clouddeploy.CustomTargetSourceSHAMetadataKey] = clouddeploy.GitCommit } // applyDeployParams replaces templated parameters in the pipelineJob manifest with // the actual values derived from deploy parameters. func applyDeployParams(configPath string) error { fullPath, _ := determineConfigFileLocation(configPath) deployParams := clouddeploy.FetchDeployParameters() return applysetters.ApplyParams(fullPath, deployParams) } // determineConfigFileLocation determines where to look for the `pipelineJob.yaml` // configuration file. Since this file is optional, we shouldn't necessarily err // if the file is missing. However, if the configRelativePath is provided it means // that the user specified this value as a deploy-parameter and we should check // that we can open and read the file or fail the render if we cannot. func determineConfigFileLocation(configRelativePath string) (string, bool) { configPath := defaultConfigPath shouldErrOnMissingFile := false if configRelativePath != "" { configPath = fmt.Sprintf("%s/%s", srcPath, configRelativePath) shouldErrOnMissingFile = true } return configPath, shouldErrOnMissingFile } // loadConfigurationFile loads and returns the configuration file for the target if it exists. func loadConfigurationFile(configPath string) ([]byte, error) { filePath, shouldErrOnMissingFile := determineConfigFileLocation(configPath) fmt.Errorf("HERE: %s", filePath) fileInfo, err := os.Stat(filePath) if err != nil && shouldErrOnMissingFile { return nil, err } if fileInfo != nil { return os.ReadFile(filePath) } return nil, nil }