custom-targets/vertex-ai-pipeline/pipeline-deployer/request.go (96 lines of code) (raw):
// Copyright 2023 Google LLC
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// https://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/cloud-deploy-samples/custom-targets/util/clouddeploy"
"google.golang.org/api/aiplatform/v1"
)
// Environment variable keys specific to the vertex ai deployer. These are provided via
// deploy parameters in Cloud Deploy.
const (
pipelineEnvKey = "CLOUD_DEPLOY_customTarget_vertexAIPipeline"
configPathKey = "CLOUD_DEPLOY_customTarget_vertexAIPipelineJobConfiguration"
paramValsKey = "CLOUD_DEPLOY_customTarget_vertexAIPipelineJobParameterValues"
locValsKey = "CLOUD_DEPLOY_customTarget_location"
projectValsKey = "CLOUD_DEPLOY_customTarget_projectID"
)
// requestHandler interface provides methods for handling the Cloud Deploy params.
type requestHandler interface {
// Process processes the Cloud Deploy params.
process(ctx context.Context) error
}
// createRequestHandler creates a requestHandler for the provided Cloud Deploy request.
func createRequestHandler(cloudDeployRequest interface{}, params *params, gcsClient *storage.Client, service *aiplatform.Service) (requestHandler, error) {
switch r := cloudDeployRequest.(type) {
case *clouddeploy.RenderRequest:
return &renderer{
req: r,
params: params,
gcsClient: gcsClient,
aiPlatformService: service,
}, nil
case *clouddeploy.DeployRequest:
return &deployer{
req: r,
params: params,
gcsClient: gcsClient,
aiPlatformService: service,
}, nil
default:
return nil, fmt.Errorf("received unsupported cloud deploy request type: %q", os.Getenv(clouddeploy.RequestTypeEnvKey))
}
}
// params contains the deploy parameter values passed into the execution environment.
type params struct {
// The Project ID for the target environment.
project string
// The location where the ML pipeline will be deployed
location string
//T he pipeline template that is being deployed
pipeline string
// The directory path where the renderer should look for target-specific configuration
// for this deployment, if not provided the renderer will check for a pipelineJob.yaml
// file in the root working directory.
configPath string
// Pipeline parameters obtained via deploy parameters. Hold parameters necessary
// for the createPipelineJobRequest, such as the prompt dataset
pipelineParams map[string]string
}
// determineParams returns the supported params provided in the execution environment via environment variables.
func determineParams() (*params, error) {
location, found := os.LookupEnv(locValsKey)
if !found {
return nil, fmt.Errorf("environment variable %s not found", locValsKey)
}
if location == "" {
return nil, fmt.Errorf("environment variable %s contains empty string", locValsKey)
}
project, found := os.LookupEnv(projectValsKey)
if !found {
return nil, fmt.Errorf("required environment variable %s not found", projectValsKey)
}
if project == "" {
return nil, fmt.Errorf("environment variable %s contains empty string", projectValsKey)
}
pipeline, found := os.LookupEnv(pipelineEnvKey)
if !found {
return nil, fmt.Errorf("required environment variable %s not found", pipelineEnvKey)
}
if pipeline == "" {
return nil, fmt.Errorf("environment variable %s contains empty string", pipelineEnvKey)
}
paramString, found := os.LookupEnv(paramValsKey)
if !found {
return nil, fmt.Errorf("required environment variable %s not found", paramValsKey)
}
var pipelineParams map[string]string
err := json.Unmarshal([]byte(paramString), &pipelineParams)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal params json")
}
if len(pipelineParams) == 0 {
return nil, fmt.Errorf("environment variable %s contains empty string", paramValsKey)
}
config, found := os.LookupEnv(configPathKey)
if !found {
return nil, fmt.Errorf("required environment variable %s not found", configPathKey)
}
if config == "" {
return nil, fmt.Errorf("environment variable %s contains empty string", configPathKey)
}
return ¶ms{
project: project,
pipeline: pipeline,
configPath: config,
location: location,
pipelineParams: pipelineParams,
}, nil
}