ste/downloader-blobFS.go (83 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package ste import ( "errors" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file" "os" "time" "github.com/Azure/azure-storage-azcopy/v10/common" ) type blobFSDownloader struct { jptm IJobPartTransferMgr txInfo *TransferInfo srcFileClient *file.Client } func newBlobFSDownloader(jptm IJobPartTransferMgr) (downloader, error) { s, err := jptm.SrcServiceClient().DatalakeServiceClient() if err != nil { return nil, err } srcFileClient := s.NewFileSystemClient(jptm.Info().SrcContainer).NewFileClient(jptm.Info().SrcFilePath) return &blobFSDownloader{srcFileClient: srcFileClient}, nil } func (bd *blobFSDownloader) Prologue(jptm IJobPartTransferMgr) { bd.jptm = jptm bd.txInfo = jptm.Info() // Inform the downloader } func (bd *blobFSDownloader) Epilogue() { if bd.jptm != nil { if bd.jptm.IsLive() && bd.jptm.Info().PreservePOSIXProperties { bsip, err := newBlobSourceInfoProvider(bd.jptm) if err != nil { bd.jptm.FailActiveDownload("get blob source info provider", err) } unixstat, _ := bsip.(IUNIXPropertyBearingSourceInfoProvider) if ubd, ok := (interface{})(bd).(unixPropertyAwareDownloader); ok && unixstat.HasUNIXProperties() { adapter, err := unixstat.GetUNIXProperties() if err != nil { bd.jptm.FailActiveDownload("get unix properties", err) } stage, err := ubd.ApplyUnixProperties(adapter) if err != nil { bd.jptm.FailActiveDownload("set unix properties: "+stage, err) } } } } } // Returns a chunk-func for ADLS gen2 downloads func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc { return createDownloadChunkFunc(jptm, id, func() { srcFileClient := bd.srcFileClient // At this point we create an HTTP(S) request for the desired portion of the file, and // wait until we get the headers back... but we have not yet read its whole body. // The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers. jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse()) get, err := srcFileClient.DownloadStream(jptm.Context(), &file.DownloadStreamOptions{Range: &file.HTTPRange{Offset: id.OffsetInFile(), Count: length}}) if err != nil { jptm.FailActiveDownload("Downloading response body", err) // cancel entire transfer because this chunk has failed return } // parse the remote lmt, there shouldn't be any error, unless the service returned a new format getLMT := get.LastModified.In(time.FixedZone("GMT", 0)) if !getLMT.Equal(jptm.LastModifiedTime().In(time.FixedZone("GMT", 0))) { jptm.FailActiveDownload("BFS File modified during transfer", errors.New("BFS File modified during transfer")) } // step 2: Enqueue the response body to be written out to disk // The retryReader encapsulates any retries that may be necessary while downloading the body jptm.LogChunkStatus(id, common.EWaitReason.Body()) retryReader := get.NewRetryReader(jptm.Context(), &file.RetryReaderOptions{ MaxRetries: MaxRetryPerDownloadBody, OnFailedRead: common.NewDatalakeReadLogFunc(jptm, srcFileClient.DFSURL()), }) defer retryReader.Close() err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true) if err != nil { jptm.FailActiveDownload("Enqueuing chunk", err) return } }) } func (bd *blobFSDownloader) CreateSymlink(jptm IJobPartTransferMgr) error { sip, err := newBlobSourceInfoProvider(jptm) if err != nil { return err } symsip := sip.(ISymlinkBearingSourceInfoProvider) // blob always implements this symlinkInfo, _ := symsip.ReadLink() // create the link err = os.Symlink(symlinkInfo, jptm.Info().Destination) return err }