cli_tools/common/image/importer/inflater.go (241 lines of code) (raw):
// Copyright 2020 Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"context"
"fmt"
"regexp"
"time"
daisy "github.com/GoogleCloudPlatform/compute-daisy"
daisyCompute "github.com/GoogleCloudPlatform/compute-daisy/compute"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/domain"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/imagefile"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/daisyutils"
"github.com/GoogleCloudPlatform/compute-image-import/cli_tools/common/utils/logging"
"github.com/GoogleCloudPlatform/compute-image-import/proto/go/pb"
)
// Inflater constructs a new persistentDisk, typically starting from a
// frozen representation of a disk, such as a VMDK file or a GCP disk image.
type Inflater interface {
Inflate() (persistentDisk, inflationInfo, error)
Cancel(reason string) bool
}
type persistentDisk struct {
uri string
sizeGb int64
sourceGb int64
sourceType string
}
type inflationInfo struct {
// Below fields are for inflation metrics
checksum string
inflationTime time.Duration
inflationType string
}
// NewInflater returns an Inflater object that uses either PD API or Daisy workflow to create a 1:1 data copy
// of disk file into GCP disk
func NewInflater(request ImageImportRequest, computeClient daisyCompute.Client, storageClient domain.StorageClientInterface,
inspector imagefile.Inspector, logger logging.Logger) (Inflater, error) {
var fileMetadata = imagefile.Metadata{}
if !isImage(request.Source) {
// 1. To reduce the runtime permissions used on the inflation worker, we pre-allocate
// disks sufficient to hold the disk file and the inflated disk. If inspection fails,
// then the default values in the daisy workflow will be used. The scratch disk gets
// a padding factor to account for filesystem overhead.
// 2. Inspection also returns checksum of the image file for sanitary check. If it's
// failed to get the checksum, the following sanitary check will be skipped.
deadline, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(inspectionTimeout))
defer cancelFunc()
logger.User("Inspecting the image file...")
fileMetadata, _ = inspector.Inspect(deadline, request.Source.Path())
}
di, err := newDaisyInflater(request, fileMetadata, logger)
if err != nil {
return nil, err
}
// This boolean switch controls whether native PD inflation is used, either
// as the primary inflation method or in a shadow test mode
tryNativePDInflation := true
if isImage(request.Source) || !tryNativePDInflation {
return di, nil
}
if isShadowTestFormat(request) {
return &shadowTestInflaterFacade{
mainInflater: di,
shadowInflater: createAPIInflater(&apiInflaterProperties{request, computeClient, storageClient, logger, true, true}),
logger: logger,
qemuChecksum: fileMetadata.Checksum,
}, nil
}
return &inflaterFacade{
apiInflater: createAPIInflater(&apiInflaterProperties{request, computeClient, storageClient, logger, false, fileMetadata.Checksum != ""}),
daisyInflater: di,
logger: logger,
qemuChecksum: fileMetadata.Checksum,
computeClient: computeClient,
request: request,
}, nil
}
func isShadowTestFormat(request ImageImportRequest) bool {
// TODO: process VHD/VPC differently. b/216323357
return false
}
// inflaterFacade implements an inflater using other concrete implementations.
type inflaterFacade struct {
apiInflater Inflater
daisyInflater Inflater
logger logging.Logger
qemuChecksum string
computeClient daisyCompute.Client
request ImageImportRequest
}
func (facade *inflaterFacade) Inflate() (persistentDisk, inflationInfo, error) {
var pd persistentDisk
var ii inflationInfo
var err error
var fallbackReason string
if facade.qemuChecksum == "" {
fallbackReason = "qemu_checksum_missing"
} else {
// Run API inflater as the primary inflation method.
pd, ii, err = facade.apiInflater.Inflate()
if err != nil {
// If the inflation is failed but not due to unsupported format, don't rerun inflation.
if !isCausedByUnsupportedFormat(err) {
facade.logger.Metric(&pb.OutputInfo{
InflationType: "api_failed",
InflationTimeMs: []int64{ii.inflationTime.Milliseconds()},
})
return pd, ii, err
}
fallbackReason = "unsupported_format"
} else {
if isChecksumMatch(ii.checksum, facade.qemuChecksum) {
facade.logger.Metric(&pb.OutputInfo{
InflationType: "api_success",
InflationTimeMs: []int64{ii.inflationTime.Milliseconds()},
})
return pd, ii, err
}
diskName := getDiskName(facade.request.ExecutionID)
// If checksum mismatches , delete the corrupted disk.
err = facade.computeClient.DeleteDisk(facade.request.Project, facade.request.Zone, diskName)
if err != nil {
return pd, ii, daisy.Errf("Tried to delete the disk after checksum mismatch is detected, but failed on: %v", err)
}
facade.logger.User("Disk checksum mismatch, recreating...")
fallbackReason = "checksum_mismatch"
}
}
// Now fallback to daisy inflater. As described above, it is due to one of the below reasons:
// 1. The API failed because it doesn't support the format of the image file.
// 2. Checksum mismatch, which means the API produced a corrupted disk.
// 3. QEMU checksum is missing, which means we have no way to compare the checksum.
return facade.fallbackToDaisyInflater(fallbackReason)
}
func (facade *inflaterFacade) fallbackToDaisyInflater(reason string) (persistentDisk, inflationInfo, error) {
pd, ii, err := facade.daisyInflater.Inflate()
if err == nil {
facade.logger.Metric(&pb.OutputInfo{
InflationType: "qemu_success",
InflationTimeMs: []int64{ii.inflationTime.Milliseconds()},
InflationFallbackReason: reason,
})
return pd, ii, err
}
facade.logger.Metric(&pb.OutputInfo{
InflationType: "qemu_failed",
InflationTimeMs: []int64{ii.inflationTime.Milliseconds()},
InflationFallbackReason: reason,
})
return pd, ii, err
}
func (facade *inflaterFacade) Cancel(reason string) bool {
// No need to cancel apiInflater.
return facade.daisyInflater.Cancel(reason)
}
// shadowTestInflaterFacade implements an inflater with shadow test support.
type shadowTestInflaterFacade struct {
mainInflater Inflater
shadowInflater Inflater
logger logging.Logger
// Runtime-populated fields
qemuChecksum string
}
// signals to control the verification towards shadow inflater
const (
sigMainInflaterDone = "main done"
sigMainInflaterErr = "main err"
sigShadowInflaterDone = "shadow done"
sigShadowInflaterErr = "shadow err"
)
func (facade *shadowTestInflaterFacade) Inflate() (persistentDisk, inflationInfo, error) {
inflaterChan := make(chan string)
// Launch main inflater.
var pd persistentDisk
var ii inflationInfo
var err error
go func() {
pd, ii, err = facade.mainInflater.Inflate()
if err != nil {
inflaterChan <- sigMainInflaterErr
} else {
inflaterChan <- sigMainInflaterDone
}
}()
// Launch shadow inflater.
var shadowPd persistentDisk
var shadowIi inflationInfo
var shadowErr error
go func() {
shadowPd, shadowIi, shadowErr = facade.shadowInflater.Inflate()
if shadowErr != nil {
inflaterChan <- sigShadowInflaterErr
} else {
inflaterChan <- sigShadowInflaterDone
}
}()
var matchResult string
// Return early if main inflater finished first.
result := <-inflaterChan
if result == sigMainInflaterDone || result == sigMainInflaterErr {
if result == sigMainInflaterDone {
matchResult = "Main inflater finished earlier"
} else {
matchResult = "Main inflater failed earlier"
}
// Wait for shadowInflater.inflate() to be canceled. Otherwise, shadowInflater.inflate() may
// be interrupted with temporary resources left: b/169073057
cancelResult := facade.shadowInflater.Cancel("cleanup shadow PD")
if cancelResult == false {
matchResult += " cleanup failed"
}
return pd, ii, err
}
// Wait for main inflater to finish, then process shadow inflater's result.
mainResult := <-inflaterChan
if result == sigShadowInflaterDone {
if mainResult == sigMainInflaterErr {
matchResult = "Main inflater failed while shadow inflater succeeded"
} else {
matchResult = facade.compareWithShadowInflater(&pd, &shadowPd, &ii, &shadowIi)
}
} else if result == sigShadowInflaterErr && mainResult == sigMainInflaterDone {
if isCausedByUnsupportedFormat(shadowErr) {
matchResult = "Shadow inflater doesn't support the format while main inflater supports"
} else if isCausedByAlphaAPIAccess(shadowErr) {
matchResult = "Shadow inflater not executed: no Alpha API access"
} else {
matchResult = fmt.Sprintf("Shadow inflater failed while main inflater succeeded: [%v]", shadowErr)
}
}
facade.logger.Metric(&pb.OutputInfo{
ShadowDiskMatchResult: matchResult,
InflationType: ii.inflationType,
InflationTimeMs: []int64{ii.inflationTime.Milliseconds()},
ShadowInflationTimeMs: []int64{shadowIi.inflationTime.Milliseconds()},
})
return pd, ii, err
}
func (facade *shadowTestInflaterFacade) Cancel(reason string) bool {
facade.shadowInflater.Cancel(reason)
return facade.mainInflater.Cancel(reason)
}
const matchFormat = "sizeGb-%v,sourceGb-%v,content-%v,qemuchecksum-%v"
func (facade *shadowTestInflaterFacade) compareWithShadowInflater(mainPd, shadowPd *persistentDisk, mainIi, shadowIi *inflationInfo) string {
sizeGbMatch := shadowPd.sizeGb == mainPd.sizeGb
sourceGbMatch := shadowPd.sourceGb == mainPd.sourceGb
contentMatch := isChecksumMatch(shadowIi.checksum, mainIi.checksum)
qemuChecksumMatch := "false"
if facade.qemuChecksum == "" {
qemuChecksumMatch = "skipped"
} else if isChecksumMatch(facade.qemuChecksum, mainIi.checksum) {
qemuChecksumMatch = "true"
}
match := sizeGbMatch && sourceGbMatch && contentMatch && (qemuChecksumMatch == "true")
var result string
if match {
result = "true"
} else {
result = fmt.Sprintf(matchFormat, sizeGbMatch, sourceGbMatch, contentMatch, qemuChecksumMatch)
}
return result
}
func getDiskName(executionID string) string {
return daisyutils.GenerateValidDisksImagesName(fmt.Sprintf("disk-%s", executionID))
}
// GetDiskURI return the URI of a PD disk
func GetDiskURI(pd persistentDisk) string {
return pd.uri
}
// isChecksumMatch verifies whether checksum matches, excluded useless characters.
func isChecksumMatch(checksum1, checksum2 string) bool {
reg, _ := regexp.Compile("[^a-zA-Z0-9]+")
processedChecksum1 := reg.ReplaceAllString(checksum1, "")
processedChecksum2 := reg.ReplaceAllString(checksum2, "")
return processedChecksum1 == processedChecksum2
}