common/singleChunkReader.go (240 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package common import ( "context" "errors" "hash" "io" "sync" ) // Reader of ONE chunk of a file. Maybe used to re-read multiple times (e.g. if // we must retry the sending of the chunk). // A instance of this type cannot be used by multiple threads (since it's Read/Seek are inherently stateful) // The reader can throw away the data after each successful read, and then re-read it from disk if there // is a need to retry the transmission of the chunk. That saves us the RAM cost of from having to keep every // transmitted chunk in RAM until acknowledged by the service. We just re-read if the service says we need to retry. // Although there's a time (performance) cost in the re-read, that's fine in a retry situation because the retry // indicates we were going too fast for the service anyway. type SingleChunkReader interface { // ReadSeeker is used to read the contents of the chunk, and because the sending pipeline seeks at various times io.ReadSeeker // Closer is needed to clean up resources io.Closer // BlockingPrefetch tries to read the full contents of the chunk into RAM. BlockingPrefetch(fileReader io.ReaderAt, isRetry bool) error // GetPrologueState is used to grab enough of the initial bytes to do MIME-type detection. Expected to be called only // on the first chunk in each file (since there's no point in calling it on others) // There is deliberately no error return value from the Prologue. // If it failed, the Prologue itself must call jptm.FailActiveSend. GetPrologueState() PrologueState // Length is the number of bytes in the chunk Length() int64 // HasPrefectchedEntirelyZeros gives an indication of whether this chunk is entirely zeros. If it returns true // then the chunk content has been prefetched AND it was all zeroes. For some remote destinations, that support "sparse file" // semantics, it is safe and correct to skip the upload of those chunks where this returns true. // In the rare edge case where this returns false due to the prefetch having failed (rather than the contents being non-zero), // we'll just treat it as a non-zero chunk. That's simpler (to code, to review and to test) than having this code force a prefetch. HasPrefetchedEntirelyZeros() bool // WriteBufferTo writes the entire contents of the prefetched buffer to h // Panics if the internal buffer has not been prefetched (or if its been discarded after a complete Read) WriteBufferTo(h hash.Hash) } // Simple aggregation of existing io interfaces type CloseableReaderAt interface { io.ReaderAt io.Closer } // Factory method for data source for singleChunkReader type ChunkReaderSourceFactory func() (CloseableReaderAt, error) func DocumentationForDependencyOnChangeDetection() { // This function does nothing, except remind you to read the following, which is essential // to the correctness of AzCopy. // *** If the code that calls this singleChunkReader reads to the end of the buffer, then // closeBuffer will automatically be called. If the calling subsequently Seeks back to // the start, and Reads again, then singleChunkReader will re-retrieve the data from disk. // That's exactly what happens if a chunk upload fails, and AzCopy's HTTP pipeline does a retry. // // Here's the problem: There is no guarantee that the data obtained from the re-read matches the // data that was retrieved the first time. It might be different. Specifically the data from the // second disk read might be different from the first if some other process modified the file). // And, importantly, AzCopy uses only the FIRST version when computing the MD5 hash of the file. // // So if the file changed, we'll upload the new version, but use the hash from the old one. That's // clearly unacceptable because the hash will be invalid. // // To solve that, we rely on our change detection logic in ste/xfer-anyToRemote-file.go/epilogueWithCleanupSendToRemote. // If some other process has changed the file, then that change detection logic will kick in, // and fail the transfer. Therefore no _successful_ transfer will suffer from // a re-read chunk being different from what was hashed. // // Search for usages of this function DocumentationForDependencyOnChangeDetection(), to see all places that // have been "commented" with a "call" to it. // // Why do we re-read from disk like this? Because if we didn't we'd have to keep every chunk in RAM until the // Storage Service acknowledged each request, and we assume that would substantially increase RAM usage. // Why do we have a function just for documentation? Because (a) the function gives us a compiler-checked // way to refer to the documentation from all relevant places, (b) IDEs can find usages of the function to // see where its referenced and (c) this is so important to the correctness of AzCopy, that it seemed sensible // to do something that was hard to ignore. } type singleChunkReader struct { // context used to allow cancellation of blocking operations // (Yes, ideally contexts are not stored in structs, but we need it inside Read, and there's no way for it to be passed in there) ctx context.Context // pool of byte slices (to avoid constant GC) slicePool ByteSlicePooler // used to track the count of bytes that are (potentially) in RAM cacheLimiter CacheLimiter // for logging chunk state transitions chunkLogger ChunkStatusLogger // general-purpose logger generalLogger ILogger // A factory to get hold of the file, in case we need to re-read any of it sourceFactory ChunkReaderSourceFactory // chunkId includes this chunk's start position (offset) in file chunkId ChunkID // number of bytes in this chunk length int64 // position for Seek/Read positionInChunk int64 // buffer used by prefetch buffer []byte // muMaster locks everything for single-threaded use... muMaster *sync.Mutex // ... except muMaster doesn't lock Close(), which can be called at the same time as reads (pipeline.Do calls it in cases where the context has been cancelled) // It could be argued that we only need muClose (since that's the only case where we knowingly call two methods at the same time - Close while a Read is in progress - // but it seems cleaner to also lock overall with muMaster rather than making weird assumptions about how we are called - concurrently or not) muClose *sync.Mutex isClosed bool } func NewSingleChunkReader(ctx context.Context, sourceFactory ChunkReaderSourceFactory, chunkId ChunkID, length int64, chunkLogger ChunkStatusLogger, generalLogger ILogger, slicePool ByteSlicePooler, cacheLimiter CacheLimiter) SingleChunkReader { if length <= 0 { return &emptyChunkReader{} } return &singleChunkReader{ muMaster: &sync.Mutex{}, muClose: &sync.Mutex{}, ctx: ctx, chunkLogger: chunkLogger, generalLogger: generalLogger, slicePool: slicePool, cacheLimiter: cacheLimiter, sourceFactory: sourceFactory, chunkId: chunkId, length: length, } } func (cr *singleChunkReader) use() { cr.muMaster.Lock() cr.muClose.Lock() } func (cr *singleChunkReader) unuse() { cr.muClose.Unlock() cr.muMaster.Unlock() } func (cr *singleChunkReader) HasPrefetchedEntirelyZeros() bool { cr.use() defer cr.unuse() if cr.buffer == nil { return false // not prefetched (and, to simply error handling in the caller, we don't call retryBlockingPrefetchIfNecessary here) } for _, b := range cr.buffer { if b != 0 { return false // it's not all zeroes } } return true // note: we are not using this optimization: int64Slice := (*(*[]int64)(unsafe.Pointer(&rangeBytes)))[:len(rangeBytes)/8] // Why? Because (a) it only works when chunk size is divisible by 8, and that's not universally the case (e.g. last chunk in a file) // and (b) some sources seem to imply that the middle of it should be &rangeBytes[0] instead of just &rangeBytes, so we'd want to // check out the pros and cons of using the [0] before using it. // and (c) we would want to check whether it really did offer meaningful real-world performance gain, before introducing use of unsafe. } func (cr *singleChunkReader) BlockingPrefetch(fileReader io.ReaderAt, isRetry bool) error { cr.use() defer cr.unuse() return cr.blockingPrefetch(fileReader, isRetry) } // Prefetch the data in this chunk, using a file reader that is provided to us. // (Allowing the caller to provide the reader to us allows a sequential read approach, since caller can control the order sequentially (in the initial, non-retry, scenario) // We use io.ReaderAt, rather than io.Reader, just for maintainablity/ensuring correctness. (Since just using Reader requires the caller to // follow certain assumptions about positioning the file pointer at the right place before calling us, but using ReaderAt does not). func (cr *singleChunkReader) blockingPrefetch(fileReader io.ReaderAt, isRetry bool) error { if cr.buffer != nil { return nil // already prefetched } // Block until we successfully add cr.length bytes to the app's current RAM allocation. // Must use "relaxed" RAM limit IFF this is a retry. Else, we can, in theory, get deadlock with all active goroutines blocked // here doing retries, but no RAM _will_ become available because its // all used by queued chunkfuncs (that can't be processed because all goroutines are active). if cr.chunkLogger != nil { cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.RAMToSchedule()) } err := cr.cacheLimiter.WaitUntilAdd(cr.ctx, cr.length, func() bool { return isRetry }) if err != nil { return err } // prepare to read if cr.chunkLogger != nil { cr.chunkLogger.LogChunkStatus(cr.chunkId, EWaitReason.DiskIO()) } targetBuffer := cr.slicePool.RentSlice(cr.length) // read WITHOUT holding the "close" lock. While we don't have the lock, we mutate ONLY local variables, no instance state. // (Don't release the other lock, muMaster, since that's unnecessary would make it harder to reason about behaviour - e.g. is something other than Close happening?) cr.muClose.Unlock() n, readErr := fileReader.ReadAt(targetBuffer, cr.chunkId.OffsetInFile()) cr.muClose.Lock() // now that we have the lock again, see if any error means we can't continue if readErr == nil { if cr.isClosed { readErr = errors.New("closed while reading") } else if cr.ctx.Err() != nil { readErr = cr.ctx.Err() // context cancelled } else if int64(n) != cr.length { readErr = errors.New("bytes read not equal to expected length. Chunk reader must be constructed so that it won't read past end of file") } } // return the revised error, if any if readErr != nil { cr.returnSlice(targetBuffer) return readErr } // We can continue, so use the data we have read cr.buffer = targetBuffer return nil } func (cr *singleChunkReader) retryBlockingPrefetchIfNecessary() error { if cr.buffer != nil { return nil // nothing to do } // create a new reader for the file (since anything that was passed to our Prefetch routine before was, deliberately, not kept) sourceFile, err := cr.sourceFactory() if err != nil { return err } defer sourceFile.Close() // no need to seek first, because its a ReaderAt const isRetry = true // retries are the only time we need to redo the prefetch return cr.blockingPrefetch(sourceFile, isRetry) } // Seeks within this chunk // Seeking is used for retries, and also by some code to get length (by seeking to end). func (cr *singleChunkReader) Seek(offset int64, whence int) (int64, error) { DocumentationForDependencyOnChangeDetection() // <-- read the documentation here cr.use() defer cr.unuse() newPosition := cr.positionInChunk switch whence { case io.SeekStart: newPosition = offset case io.SeekCurrent: newPosition += offset case io.SeekEnd: newPosition = cr.length - offset } if newPosition < 0 { return 0, errors.New("cannot seek to before beginning") } if newPosition > cr.length { newPosition = cr.length } cr.positionInChunk = newPosition return cr.positionInChunk, nil } // Reads from within this chunk. func (cr *singleChunkReader) Read(p []byte) (n int, err error) { DocumentationForDependencyOnChangeDetection() // <-- read the documentation here cr.use() defer cr.unuse() // This is a normal read, so free the prefetch buffer when hit EOF (i.e. end of this chunk). // We do so on the assumption that if we've read to the end we don't need the prefetched data any longer. // (If later, there's a retry that forces seek back to start and re-read, we'll automatically trigger a re-fetch at that time) return cr.doRead(p, true) } func (cr *singleChunkReader) doRead(p []byte, freeBufferOnEof bool) (n int, err error) { // check for EOF, BEFORE we ensure prefetch // (Otherwise, some readers can call us after EOF, and we end up re-pre-fetching unnecessarily) if cr.positionInChunk >= cr.length { return 0, io.EOF } // Always use the prefetch logic to read the data // This is simpler to maintain than using a different code path for the (rare) cases // where there has been no prefetch before this routine is called err = cr.retryBlockingPrefetchIfNecessary() if err != nil { return 0, err } // extra checks to be safe (originally for https://github.com/Azure/azure-storage-azcopy/issues/191) // No longer needed now that use/unuse lock with a mutex, but there's no harm in leaving them here if cr.buffer == nil { panic("unexpected nil buffer") } if cr.positionInChunk >= cr.length { panic("unexpected EOF") } if cr.length != int64(len(cr.buffer)) { panic("unexpected buffer length discrepancy") } // Copy the data across bytesCopied := copy(p, cr.buffer[cr.positionInChunk:]) cr.positionInChunk += int64(bytesCopied) // check for EOF isEof := cr.positionInChunk >= cr.length if isEof { if freeBufferOnEof { cr.closeBuffer() } return bytesCopied, io.EOF } return bytesCopied, nil } // Disposes of the buffer to save RAM. func (cr *singleChunkReader) closeBuffer() { DocumentationForDependencyOnChangeDetection() // <-- read the documentation here if cr.buffer == nil { return } cr.returnSlice(cr.buffer) cr.buffer = nil } func (cr *singleChunkReader) returnSlice(slice []byte) { cr.slicePool.ReturnSlice(slice) cr.cacheLimiter.Remove(int64(len(slice))) } func (cr *singleChunkReader) Length() int64 { cr.use() defer cr.unuse() return cr.length } // Some code paths can call this, when cleaning up. (Even though in the normal, non error, code path, we don't NEED this // because we close at the completion of a successful read of the whole prefetch buffer. // We still want this though, to handle cases where for some reason the transfer stops before all the buffer has been read.) // Without this close, if something failed part way through, we would keep counting this object's bytes in cacheLimiter // "for ever", even after the object is gone. func (cr *singleChunkReader) Close() error { // First, check and log early closes // This check originates from issue 191. Even tho we think we've now resolved that issue, // we'll keep this code just to make sure. if cr.positionInChunk < cr.length && cr.ctx.Err() == nil { cr.generalLogger.Log(LogInfo, "Early close of chunk in singleChunkReader with context still active") // cannot panic here, since this code path is NORMAL in the case of sparse files to Azure Files and Page Blobs } // Only acquire the Close mutex (it will be free if the prefetch method is in the middle of a disk read) // Don't acquire muMaster, which will not be free in that situation cr.muClose.Lock() defer cr.muClose.Unlock() // do the real work cr.closeBuffer() cr.isClosed = true /* * Set chunkLogger to nil, so that chunkStatusLogger can be GC'ed. * * TODO: We should not need to explicitly set this to nil but today we have a yet-unknown ref on cr which * is leaking this "big" chunkStatusLogger memory, so we cause that to be freed by force dropping this ref. * * Note: We are force setting this to nil and we safe guard against this by checking chunklogger not nil at respective places. * At present this is called only from blockingPrefetch(). */ cr.chunkLogger = nil return nil } // Grab the leading bytes, for later MIME type recognition // (else we would have to re-read the start of the file later, and that breaks our rule to use sequential // reads as much as possible) func (cr *singleChunkReader) GetPrologueState() PrologueState { cr.use() // can't defer unuse here. See explicit calls (plural) below const mimeRecgonitionLen = 512 leadingBytes := make([]byte, mimeRecgonitionLen) n, err := cr.doRead(leadingBytes, false) // do NOT free bufferOnEOF. So that if its a very small file, and we hit the end, we won't needlessly discard the prefetched data if err != nil && err != io.EOF { cr.unuse() return PrologueState{} // empty return value, because we just can't sniff the mime type } if n < len(leadingBytes) { // truncate if we read less than expected (very small file, so err was EOF above) leadingBytes = leadingBytes[:n] } // unuse before Seek, since Seek is public cr.unuse() // MUST re-wind, so that the bytes we read will get transferred too! _, _ = cr.Seek(0, io.SeekStart) return PrologueState{LeadingBytes: leadingBytes} } // Writes the buffer to a hasher. Does not alter positionInChunk func (cr *singleChunkReader) WriteBufferTo(h hash.Hash) { DocumentationForDependencyOnChangeDetection() // <-- read the documentation here cr.use() defer cr.unuse() if cr.buffer == nil { panic("invalid state. No prefetch buffer is present") } _, err := h.Write(cr.buffer) if err != nil { panic("documentation of hash.Hash.Write says it will never return an error") } }