custom-targets/vertex-ai-pipeline/pipeline-deployer/deploy.go (89 lines of code) (raw):
// Copyright 2023 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// deploy.go contains logic to deploy a pipeline to vertex AI.
package main
import (
"context"
"fmt"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/cloud-deploy-samples/custom-targets/util/clouddeploy"
"google.golang.org/api/aiplatform/v1"
"sigs.k8s.io/yaml"
)
const aiDeployerSampleName = "clouddeploy-vertex-ai-pipeline-sample"
const localManifest = "manifest.yaml"
// deployer implements the handler interface to deploy a pipeline using the vertex AI API.
type deployer struct {
gcsClient *storage.Client
aiPlatformService *aiplatform.Service
params *params
req *clouddeploy.DeployRequest
}
// process processes the Deploy request, and performs the vertex AI pipeline deployment.
func (d *deployer) process(ctx context.Context) error {
fmt.Println("Processing deploy request")
res, err := d.deploy(ctx)
if err != nil {
fmt.Printf("Deploy failed: %v\n", err)
dr := &clouddeploy.DeployResult{
ResultStatus: clouddeploy.DeployFailed,
FailureMessage: err.Error(),
}
d.addCommonMetadata(dr)
fmt.Println("Uploading failed deploy results")
rURI, err := d.req.UploadResult(ctx, d.gcsClient, dr)
if err != nil {
return fmt.Errorf("error uploading failed deploy results: %v", err)
}
fmt.Printf("Uploaded failed deploy results to %s\n", rURI)
return err
}
d.addCommonMetadata(res)
fmt.Println("Uploading successful deploy results")
rURI, err := d.req.UploadResult(ctx, d.gcsClient, res)
if err != nil {
return fmt.Errorf("error uploading deploy results: %v", err)
}
fmt.Printf("Uploaded deploy results to %s\n", rURI)
return nil
}
// deploy performs the Vertex AI pipeline deployment
func (d *deployer) deploy(ctx context.Context) (*clouddeploy.DeployResult, error) {
if err := d.downloadManifest(ctx); err != nil {
return nil, err
}
manifestData, err := d.applyPipeline(ctx, localManifest)
if err != nil {
return nil, fmt.Errorf("failed to deploy pipeline: %v", err)
}
mURI, err := d.req.UploadArtifact(ctx, d.gcsClient, "manifest.yaml", &clouddeploy.GCSUploadContent{Data: manifestData})
if err != nil {
return nil, fmt.Errorf("error uploading deploy artifact: %v", err)
}
return &clouddeploy.DeployResult{
ResultStatus: clouddeploy.DeploySucceeded,
ArtifactFiles: []string{mURI},
}, nil
}
// downloadManifest downloads the rendered manifest from Google Cloud Storage to the local manifest file path
func (d *deployer) downloadManifest(ctx context.Context) error {
fmt.Printf("Downloading deploy input manifest from %q.\n", d.req.ManifestGCSPath)
downloadPath, err := d.req.DownloadManifest(ctx, d.gcsClient, localManifest)
if err != nil {
fmt.Printf("Unable to download deployed manifest from: %s.\n", d.req.ManifestGCSPath)
return fmt.Errorf("unable to download deploy input from %s: %v", d.req.ManifestGCSPath, err)
}
fmt.Printf("Downloaded deploy input manifest from: %s\n", downloadPath)
return nil
}
// addCommonMetadata inserts metadata into the deploy result that should be present
// regardless of deploy success or failure.
func (d *deployer) addCommonMetadata(rs *clouddeploy.DeployResult) {
if rs.Metadata == nil {
rs.Metadata = map[string]string{}
}
rs.Metadata[clouddeploy.CustomTargetSourceMetadataKey] = aiDeployerSampleName
rs.Metadata[clouddeploy.CustomTargetSourceSHAMetadataKey] = clouddeploy.GitCommit
}
// applyModel deploys the CreatePipelineJobRequest parsed from `localManifest`
// it returns the CreatePipelineJobRequest object that was used in yaml format.
func (d *deployer) applyPipeline(ctx context.Context, localManifest string) ([]byte, error) {
pipelineRequest, err := pipelineRequestFromManifest(localManifest)
if err != nil {
return nil, fmt.Errorf("unable to load CreatePipelineJobRequest from manifest: %v", err)
}
parent := fmt.Sprintf("projects/%s/locations/%s", d.params.project, d.params.location)
if err := deployPipeline(ctx, d.aiPlatformService, parent, pipelineRequest); err != nil {
return nil, fmt.Errorf("unable to deploy pipeline: %v", err)
}
return yaml.Marshal(pipelineRequest)
}