cli_tools/common/image/importer/importer.go (164 lines of code) (raw):
// Copyright 2019 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"context"
"log"
"path"
"sync"
"time"
daisy "github.com/GoogleCloudPlatform/compute-daisy"
daisyCompute "github.com/GoogleCloudPlatform/compute-daisy/compute"
"google.golang.org/api/googleapi"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/disk"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/domain"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/imagefile"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/logging"
"github.com/GoogleCloudPlatform/compute-image-import/proto/go/pb"
)
// Importer creates a GCE disk image from a source disk file or image.
//
//go:generate go run github.com/golang/mock/mockgen -package imagemocks -source $GOFILE -destination mocks/importer_mocks.go
type Importer interface {
Run(ctx context.Context) error
}
// NewImporter constructs an Importer instance.
func NewImporter(request ImageImportRequest, computeClient daisyCompute.Client, storageClient domain.StorageClientInterface, logger logging.Logger) (Importer, error) {
if err := request.validate(); err != nil {
return nil, err
}
inflater, err := NewInflater(request, computeClient, storageClient, imagefile.NewGCSInspector(), logger)
if err != nil {
return nil, err
}
inspector, err := disk.NewInspector(request.EnvironmentSettings(), logger)
if err != nil {
return nil, err
}
return &importer{
project: request.Project,
zone: request.Zone,
timeout: request.Timeout,
preValidator: newPreValidator(request, computeClient),
inflater: inflater,
processorProvider: defaultProcessorProvider{
request,
computeClient,
newProcessPlanner(request, inspector, logger),
logger,
},
diskClient: computeClient,
logger: logger,
}, nil
}
// importer is an implementation of Importer that uses a combination of Daisy workflows
// and GCP API calls.
type importer struct {
project, zone string
pd persistentDisk
preValidator validator
inflater Inflater
processorProvider processorProvider
diskClient diskClient
logger logging.Logger
timeout time.Duration
}
func (i *importer) Run(ctx context.Context) error {
if i.timeout.Nanoseconds() > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, i.timeout)
defer cancel()
}
if err := i.preValidator.validate(); err != nil {
return err
}
defer i.deleteDisk()
if err := i.runInflate(ctx); err != nil {
return err
}
err := i.runProcess(ctx)
if err != nil {
return err
}
return err
}
func (i *importer) runInflate(ctx context.Context) (err error) {
return i.runStep(ctx, func() error {
var err error
i.pd, _, err = i.inflater.Inflate()
if i.pd.sizeGb > 0 {
i.logger.Metric(&pb.OutputInfo{
SourcesSizeGb: []int64{i.pd.sourceGb},
TargetsSizeGb: []int64{i.pd.sizeGb},
ImportFileFormat: i.pd.sourceType,
})
}
return err
}, i.inflater.Cancel)
}
func (i *importer) runProcess(ctx context.Context) error {
processors, err := i.processorProvider.provide(i.pd)
if err != nil {
return err
}
for _, processor := range processors {
err = i.runStep(ctx, func() error {
var err error
i.pd, err = processor.process(i.pd)
if err != nil {
return err
}
return nil
}, processor.cancel)
if err != nil {
return err
}
}
return nil
}
func (i *importer) runStep(ctx context.Context, step func() error, cancel func(string) bool) (err error) {
e := make(chan error)
var wg sync.WaitGroup
go func() {
//this select checks if context expired prior to runStep being called
//if not, step is run
select {
case <-ctx.Done():
e <- i.getCtxError(ctx)
default:
wg.Add(1)
var stepErr error
defer func() {
// error should only be returned after wg is marked as done. Otherwise,
// a deadlock can occur when handling a timeout in the select below
// because cancel() causes step() to finish, then waits for wg, while
// writing to error chan waits on error chan reader which never happens
wg.Done()
e <- stepErr
}()
stepErr = step()
}
}()
// this select waits for either context expiration or step to finish (with either an error or success)
select {
case <-ctx.Done():
if cancel("timed-out") {
//Only return timeout error if step was able to cancel on time-out.
//Otherwise, step has finished and import succeeded even though it timed out
err = i.getCtxError(ctx)
}
wg.Wait()
case stepErr := <-e:
err = stepErr
}
return err
}
func (i *importer) getCtxError(ctx context.Context) (err error) {
if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
err = daisy.Errf("Import did not complete within the specified timeout of %s", i.timeout)
} else {
err = ctxErr
}
return err
}
func (i *importer) deleteDisk() {
deleteDisk(i.diskClient, i.project, i.zone, i.pd)
}
func deleteDisk(diskClient diskClient, project string, zone string, pd persistentDisk) {
if pd.uri == "" {
return
}
diskName := path.Base(pd.uri)
if err := diskClient.DeleteDisk(project, zone, diskName); err != nil {
gAPIErr, isGAPIErr := err.(*googleapi.Error)
if isGAPIErr && gAPIErr.Code != 404 {
log.Printf("Failed to remove temporary disk %v: %e", pd, err)
}
}
}
// diskClient is the subset of the GCP API that is used by importer.
type diskClient interface {
DeleteDisk(project, zone, uri string) error
}