cli_tools/gce_export/main.go (230 lines of code) (raw):
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// export streams a local disk to a Google Compute Engine image file in a Google Cloud Storage bucket.
package main
import (
"archive/tar"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/compute-image-tools/cli_tools/common/domain"
"github.com/GoogleCloudPlatform/compute-image-tools/cli_tools/common/utils/logging"
storageutils "github.com/GoogleCloudPlatform/compute-image-tools/cli_tools/common/utils/storage"
"github.com/dustin/go-humanize"
gzip "github.com/klauspost/pgzip"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
)
const logPrefix = "[gce-export]"
var (
disk = flag.String("disk", "", "disk to export, on linux this would be something like '/dev/sda', and on Windows '\\\\.\\PhysicalDrive1'")
bufferPrefix = flag.String("buffer_prefix", "", "if set will use this local path as the local buffer prefix")
gcsPath = flag.String("gcs_path", "", "GCS path to upload the image to, gs://my-bucket/image.tar.gz")
oauth = flag.String("oauth", "", "path to oauth json file")
licenses = flag.String("licenses", "", "comma delimited list of licenses to add to the image")
noconfirm = flag.Bool("y", false, "skip confirmation")
level = flag.Int("level", 3, "level of compression from 1-9, 1 being best speed, 9 being best compression")
bufferSize = flag.String("buffer_size", "1GiB", "max buffer size to use")
workers = flag.Int("workers", runtime.NumCPU(), "number of upload workers to utilize")
)
// progress is a io.Writer that updates total in Write.
type progress struct {
total int64
lock sync.Mutex
}
func (p *progress) Write(b []byte) (int, error) {
p.lock.Lock()
p.total += int64(len(b))
p.lock.Unlock()
return len(b), nil
}
func splitLicenses(input string) []string {
if input == "" {
return nil
}
var ls []string
for _, l := range strings.Split(input, ",") {
ls = append(ls, l)
}
return ls
}
func writeGzipProgress(start time.Time, size int64, rp, wp *progress) {
time.Sleep(5 * time.Second)
var oldUpload int64
var oldRead int64
var oldSince int64
totalSize := humanize.IBytes(uint64(size))
for {
rp.lock.Lock()
rpTotal := rp.total
rp.lock.Unlock()
wp.lock.Lock()
wpTotal := wp.total
wp.lock.Unlock()
since := int64(time.Since(start).Seconds())
diskSpd := humanize.IBytes(uint64((rpTotal - oldRead) / (since - oldSince)))
upldSpd := humanize.IBytes(uint64((wpTotal - oldUpload) / (since - oldSince)))
uploadTotal := humanize.IBytes(uint64(wpTotal))
readTotal := humanize.IBytes(uint64(rpTotal))
if readTotal == totalSize {
return
}
fmt.Printf("GCEExport: Read %s of %s (%s/sec),", readTotal, totalSize, diskSpd)
fmt.Printf(" total written size: %s (%s/sec)\n", uploadTotal, upldSpd)
oldUpload = wpTotal
oldRead = rpTotal
oldSince = since
time.Sleep(30 * time.Second)
}
}
func gcsClient(ctx context.Context, oauth string) (domain.StorageClientInterface, error) {
log.SetPrefix(logPrefix + " ")
logger := logging.NewToolLogger(logPrefix)
baseTransport := &http.Transport{
DisableKeepAlives: false,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 0,
IdleConnTimeout: 60 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
transport, err := htransport.NewTransport(ctx, baseTransport)
if err != nil {
return nil, err
}
return storageutils.NewStorageClient(ctx, logger, option.WithHTTPClient(&http.Client{Transport: transport}),
option.WithCredentialsFile(oauth))
}
func gzipDisk(file *os.File, size int64, writer io.WriteCloser) error {
wp := &progress{}
gw, err := gzip.NewWriterLevel(io.MultiWriter(wp, writer), *level)
if err != nil {
return err
}
rp := &progress{}
tw := tar.NewWriter(io.MultiWriter(rp, gw))
ls := splitLicenses(*licenses)
if ls != nil {
fmt.Printf("GCEExport: Creating gzipped image of %q with licenses %q.\n", file.Name(), ls)
} else {
fmt.Printf("GCEExport: Creating gzipped image of %q.\n", file.Name())
}
start := time.Now()
if ls != nil {
type lsJSON struct {
Licenses []string `json:"licenses"`
}
body, err := json.Marshal(lsJSON{Licenses: ls})
if err != nil {
return err
}
if err := tw.WriteHeader(&tar.Header{
Name: "manifest.json",
Mode: 0600,
Size: int64(len(body)),
Format: tar.FormatGNU,
}); err != nil {
return err
}
if _, err := tw.Write([]byte(body)); err != nil {
return err
}
}
if err := tw.WriteHeader(&tar.Header{
Name: "disk.raw",
Mode: 0600,
Size: size,
Format: tar.FormatGNU,
}); err != nil {
return err
}
go writeGzipProgress(start, size, rp, wp)
if _, err := io.CopyN(tw, file, size); err != nil {
return err
}
if err := tw.Close(); err != nil {
return err
}
if err := gw.Close(); err != nil {
return err
}
if err := writer.Close(); err != nil {
return err
}
since := time.Since(start)
spd := humanize.IBytes(uint64(size / int64(since.Seconds())))
ratio := size / wp.total
log.Printf("GCEExport: Finished creating gzipped image of %q in %s [%s/s] with a compression ratio of %d.", file.Name(), since, spd, ratio)
return nil
}
func stream(ctx context.Context, src *os.File, size int64, prefix, bkt, obj string) error {
fmt.Printf("GCEExport: Copying %q to gs://%s/%s.\n", src.Name(), bkt, obj)
if prefix != "" {
bs, err := humanize.ParseBytes(*bufferSize)
if err != nil {
return err
}
prefix, err := filepath.Abs(prefix)
if err != nil {
return err
}
buf := storageutils.NewBufferedWriter(ctx, int64(bs), int64(*workers), gcsClient, *oauth, prefix, bkt, obj, "GCEExport")
fmt.Printf("GCEExport: Using %q as the buffer prefix, %s as the buffer size, and %d as the number of workers.\n", prefix, humanize.IBytes(bs), *workers)
return gzipDisk(src, size, buf)
}
client, err := gcsClient(ctx, *oauth)
if err != nil {
return err
}
w := client.GetObject(bkt, obj).NewWriter()
fmt.Println("GCEExport: No local cache set, streaming directly to GCS.")
return gzipDisk(src, size, w)
}
func main() {
flag.Parse()
ctx := context.Background()
if *gcsPath == "" {
log.Fatal("The flag -gcs_path must be provided")
}
if *disk == "" {
log.Fatal("The flag -disk must be provided")
}
bkt, obj, err := storageutils.GetGCSObjectPathElements(*gcsPath)
if err != nil {
log.Fatal(err)
}
file, err := os.Open(*disk)
if err != nil {
log.Fatal(err)
}
defer file.Close()
size, err := diskLength(file)
if err != nil {
log.Fatal(err)
}
fmt.Printf("GCEExport: Disk %s is %s, compressed size will most likely be much smaller.\n", *disk, humanize.IBytes(uint64(size)))
if !*noconfirm {
fmt.Print("Continue? (y/N): ")
var c string
fmt.Scanln(&c)
c = strings.ToLower(c)
if c != "y" && c != "yes" {
fmt.Println("Aborting")
os.Exit(0)
}
}
fmt.Println("GCEExport: Beginning export process...")
start := time.Now()
if err := stream(ctx, file, size, *bufferPrefix, bkt, obj); err != nil {
log.Fatal(err)
}
fmt.Println("GCEExport: Finished export in ", time.Since(start))
}