oss/io_utils.go (687 lines of code) (raw):

package oss import ( "context" "errors" "fmt" "io" "sync" "time" ) type LimitedReadCloser struct { *io.LimitedReader io.Closer } func NewLimitedReadCloser(rc io.ReadCloser, limit int64) io.ReadCloser { if limit < 0 { return rc } return &LimitedReadCloser{ LimitedReader: &io.LimitedReader{R: rc, N: limit}, Closer: rc, } } func ReadSeekNopCloser(r io.Reader) ReadSeekerNopClose { return ReadSeekerNopClose{r} } type ReadSeekerNopClose struct { r io.Reader } func (r ReadSeekerNopClose) Read(p []byte) (int, error) { switch t := r.r.(type) { case io.Reader: return t.Read(p) } return 0, nil } func (r ReadSeekerNopClose) Seek(offset int64, whence int) (int64, error) { switch t := r.r.(type) { case io.Seeker: return t.Seek(offset, whence) } return int64(0), nil } func (r ReadSeekerNopClose) Close() error { return nil } func (r ReadSeekerNopClose) IsSeeker() bool { _, ok := r.r.(io.Seeker) return ok } func (r ReadSeekerNopClose) HasLen() (int, bool) { type lenner interface { Len() int } if lr, ok := r.r.(lenner); ok { return lr.Len(), true } return 0, false } func (r ReadSeekerNopClose) GetLen() (int64, error) { if l, ok := r.HasLen(); ok { return int64(l), nil } if s, ok := r.r.(io.Seeker); ok { return seekerLen(s) } return -1, nil } func seekerLen(s io.Seeker) (int64, error) { curOffset, err := s.Seek(0, io.SeekCurrent) if err != nil { return 0, err } endOffset, err := s.Seek(0, io.SeekEnd) if err != nil { return 0, err } _, err = s.Seek(curOffset, io.SeekStart) if err != nil { return 0, err } return endOffset - curOffset, nil } func isReaderSeekable(r io.Reader) bool { switch v := r.(type) { case ReadSeekerNopClose: return v.IsSeeker() case *ReadSeekerNopClose: return v.IsSeeker() case io.ReadSeeker: return true default: return false } } func GetReaderLen(r io.Reader) int64 { type lenner interface { Len() int } if lr, ok := r.(lenner); ok { return int64(lr.Len()) } if s, ok := r.(io.Seeker); ok { if l, err := seekerLen(s); err == nil { return l } } return -1 } type buffer struct { buf []byte err error offset int } func (b *buffer) isEmpty() bool { if b == nil { return true } if len(b.buf)-b.offset <= 0 { return true } return false } func (b *buffer) read(rd io.Reader) error { var n int n, b.err = readFill(rd, b.buf) b.buf = b.buf[0:n] b.offset = 0 return b.err } func (b *buffer) buffer() []byte { return b.buf[b.offset:] } func (b *buffer) increment(n int) { b.offset += n } const ( AsyncReadeBufferSize = 1024 * 1024 softStartInitial = 512 * 1024 ) type ReaderRangeGetOutput struct { Body io.ReadCloser ContentLength int64 ContentRange *string ETag *string LastModified *time.Time } type ReaderRangeGetFn func(context.Context, HTTPRange) (output *ReaderRangeGetOutput, err error) type AsyncRangeReader struct { in io.ReadCloser // Input reader ready chan *buffer // Buffers ready to be handed to the reader token chan struct{} // Tokens which allow a buffer to be taken exit chan struct{} // Closes when finished buffers int // Number of buffers err error // If an error has occurred it is here cur *buffer // Current buffer being served exited chan struct{} // Channel is closed been the async reader shuts down size int // size of buffer to use closed bool // whether we have closed the underlying stream mu sync.Mutex // lock for Read/WriteTo/Abandon/Close //Range Getter rangeGet ReaderRangeGetFn httpRange HTTPRange // For reader offset int64 gotsize int64 oriHttpRange HTTPRange context context.Context cancel context.CancelFunc // Origin file pattern etag string modTime string } // NewAsyncRangeReader returns a reader that will asynchronously read from // the Reader returued by getter from the given offset into a number of buffers each of size AsyncReadeBufferSize // The input can be read from the returned reader. // When done use Close to release the buffers and close the supplied input. // The etag is used to identify the content of the object. If not set, the first ETag returned value will be used instead. func NewAsyncRangeReader(ctx context.Context, rangeGet ReaderRangeGetFn, httpRange *HTTPRange, etag string, buffers int) (*AsyncRangeReader, error) { if buffers <= 0 { return nil, errors.New("number of buffers too small") } if rangeGet == nil { return nil, errors.New("nil reader supplied") } context, cancel := context.WithCancel(ctx) range_ := HTTPRange{} if httpRange != nil { range_ = *httpRange } a := &AsyncRangeReader{ rangeGet: rangeGet, context: context, cancel: cancel, httpRange: range_, oriHttpRange: range_, offset: range_.Offset, gotsize: 0, etag: etag, buffers: buffers, } //fmt.Printf("NewAsyncRangeReader, range: %s, etag:%s, buffer count:%v\n", ToString(a.httpRange.FormatHTTPRange()), a.etag, a.buffers) a.init(buffers) return a, nil } func (a *AsyncRangeReader) init(buffers int) { a.ready = make(chan *buffer, buffers) a.token = make(chan struct{}, buffers) a.exit = make(chan struct{}) a.exited = make(chan struct{}) a.buffers = buffers a.cur = nil a.size = softStartInitial // Create tokens for i := 0; i < buffers; i++ { a.token <- struct{}{} } // Start async reader go func() { // Ensure that when we exit this is signalled. defer close(a.exited) defer close(a.ready) for { select { case <-a.token: b := a.getBuffer() if a.size < AsyncReadeBufferSize { b.buf = b.buf[:a.size] a.size <<= 1 } if a.httpRange.Count > 0 && a.gotsize > a.httpRange.Count { b.buf = b.buf[0:0] b.err = io.EOF //fmt.Printf("a.gotsize > a.httpRange.Count, err:%v\n", b.err) a.ready <- b return } if a.in == nil { httpRangeRemains := a.httpRange if a.httpRange.Count > 0 { gotNum := a.httpRange.Offset - a.oriHttpRange.Offset if gotNum > 0 && a.httpRange.Count > gotNum { httpRangeRemains.Count = a.httpRange.Count - gotNum } } output, err := a.rangeGet(a.context, httpRangeRemains) if err == nil { etag := ToString(output.ETag) if a.etag == "" { a.etag = etag } if etag != a.etag { err = fmt.Errorf("Source file is changed, expect etag:%s ,got etag:%s", a.etag, etag) } // Partial Response check var off int64 if output.ContentRange == nil { off = 0 } else { off, _, _, _ = ParseContentRange(*output.ContentRange) } if off != httpRangeRemains.Offset { err = fmt.Errorf("Range get fail, expect offset:%v, got offset:%v", httpRangeRemains.Offset, off) } } if err != nil { b.buf = b.buf[0:0] b.err = err if output != nil && output.Body != nil { output.Body.Close() } //fmt.Printf("call getFunc fail, err:%v\n", err) a.ready <- b return } body := output.Body if httpRangeRemains.Count > 0 { body = NewLimitedReadCloser(output.Body, httpRangeRemains.Count) } a.in = body //fmt.Printf("call getFunc done, range:%s\n", ToString(httpRangeRemains.FormatHTTPRange())) } // ignore err from read err := b.read(a.in) a.httpRange.Offset += int64(len(b.buf)) a.gotsize += int64(len(b.buf)) if err != io.EOF { b.err = nil } //fmt.Printf("read into buffer, size:%v, next begin:%v, err:%v\n", len(b.buf), a.httpRange.Offset, err) a.ready <- b if err != nil { a.in.Close() a.in = nil if err == io.EOF { return } } case <-a.exit: return } } }() } func (a *AsyncRangeReader) fill() (err error) { if a.cur.isEmpty() { if a.cur != nil { a.putBuffer(a.cur) a.token <- struct{}{} a.cur = nil } b, ok := <-a.ready if !ok { // Return an error to show fill failed if a.err == nil { return errors.New("stream abandoned") } return a.err } a.cur = b } return nil } // Read will return the next available data. func (a *AsyncRangeReader) Read(p []byte) (n int, err error) { a.mu.Lock() defer a.mu.Unlock() defer func() { a.offset += int64(n) }() // Swap buffer and maybe return error err = a.fill() if err != nil { return 0, err } // Copy what we can n = copy(p, a.cur.buffer()) a.cur.increment(n) // If at end of buffer, return any error, if present if a.cur.isEmpty() { a.err = a.cur.err return n, a.err } return n, nil } func (a *AsyncRangeReader) Offset() int64 { return a.offset } func (a *AsyncRangeReader) Close() (err error) { a.abandon() if a.closed { return nil } a.closed = true if a.in != nil { err = a.in.Close() } return } func (a *AsyncRangeReader) abandon() { a.stop() a.mu.Lock() defer a.mu.Unlock() if a.cur != nil { a.putBuffer(a.cur) a.cur = nil } for b := range a.ready { a.putBuffer(b) } } func (a *AsyncRangeReader) stop() { select { case <-a.exit: return default: } a.cancel() close(a.exit) <-a.exited } // bufferPool is a global pool of buffers var bufferPool *sync.Pool var bufferPoolOnce sync.Once // TODO use pool func (a *AsyncRangeReader) putBuffer(b *buffer) { b.buf = b.buf[0:cap(b.buf)] bufferPool.Put(b.buf) } func (a *AsyncRangeReader) getBuffer() *buffer { bufferPoolOnce.Do(func() { // Initialise the buffer pool when used bufferPool = &sync.Pool{ New: func() any { //fmt.Printf("make([]byte, BufferSize)\n") return make([]byte, AsyncReadeBufferSize) }, } }) return &buffer{ buf: bufferPool.Get().([]byte), } } func readFill(r io.Reader, buf []byte) (n int, err error) { var nn int for n < len(buf) && err == nil { nn, err = r.Read(buf[n:]) n += nn } return n, err } // MultiBytesReader A Reader implements the io.Reader, io.Seeker interfaces by reading from multi byte slice. type MultiBytesReader struct { s [][]byte i int64 // current reading index size int64 rbuf int rp int } // Len returns the number of bytes of the unread portion of the slice. func (r *MultiBytesReader) Len() int { if r.i >= r.size { return 0 } return int(r.size - r.i) } // Size returns the original length of the underlying byte slice. func (r *MultiBytesReader) Size() int64 { return r.size } // Read implements the io.Reader interface. func (r *MultiBytesReader) Read(b []byte) (n int, err error) { if r.i >= r.size { return 0, io.EOF } var nn int for n < len(b) && err == nil { nn, err = r.read(b[n:]) n += nn } if err == io.EOF { err = nil } return n, err } func (r *MultiBytesReader) read(b []byte) (n int, err error) { if r.i >= r.size { return 0, io.EOF } //if r.rp == cap(r.s[r.rbuf]) { if r.rp == len(r.s[r.rbuf]) { r.rbuf++ r.rp = 0 } if r.rbuf == len(r.s) { err = io.EOF return } else if r.rbuf > len(r.s) { return 0, fmt.Errorf("read overflow, rbuf:%d, buf len%d", r.rbuf, len(r.s)) } n = copy(b, r.s[r.rbuf][r.rp:]) r.rp += n r.i += int64(n) return } // Seek implements the io.Seeker interface. func (r *MultiBytesReader) Seek(offset int64, whence int) (int64, error) { var abs int64 switch whence { case io.SeekStart: abs = offset case io.SeekCurrent: abs = r.i + offset case io.SeekEnd: abs = r.size + offset default: return 0, errors.New("MultiSliceReader.Seek: invalid whence") } if abs < 0 { return 0, errors.New("MultiSliceReader.Seek: negative position") } r.i = abs r.updateRp() return abs, nil } // Reset resets the Reader to be reading from b. func (r *MultiBytesReader) Reset(b [][]byte) { n := MultiBytesReader{ s: b, i: 0, } n.size = int64(r.calcSize(n.s)) n.updateRp() *r = n } func (r *MultiBytesReader) calcSize(b [][]byte) int { size := 0 for i := 0; i < len(b); i++ { size += len(r.s[i]) } return size } func (r *MultiBytesReader) updateRp() { remains := r.i rbuf := 0 for remains > 0 && rbuf < len(r.s) { slen := int64(len(r.s[rbuf])) if remains < slen { break } rbuf++ remains -= slen } r.rbuf = rbuf r.rp = int(remains) } // NewReader returns a new Reader reading from b. func NewMultiBytesReader(b [][]byte) *MultiBytesReader { r := &MultiBytesReader{ s: b, i: 0, } r.size = int64(r.calcSize(r.s)) r.updateRp() return r } type RangeReader struct { in io.ReadCloser // Input reader closed bool // whether we have closed the underlying stream //Range Getter rangeGet ReaderRangeGetFn httpRange HTTPRange // For reader offset int64 oriHttpRange HTTPRange context context.Context // Origin file pattern etag string modTime *time.Time totalSize int64 } // NewRangeReader returns a reader that will read from the Reader returued by getter from the given offset. // The etag is used to identify the content of the object. If not set, the first ETag returned value will be used instead. func NewRangeReader(ctx context.Context, rangeGet ReaderRangeGetFn, httpRange *HTTPRange, etag string) (*RangeReader, error) { if rangeGet == nil { return nil, errors.New("nil reader supplied") } range_ := HTTPRange{} if httpRange != nil { range_ = *httpRange } a := &RangeReader{ rangeGet: rangeGet, context: ctx, httpRange: range_, oriHttpRange: range_, offset: range_.Offset, etag: etag, } //fmt.Printf("NewRangeReader, range: %s, etag:%s\n", ToString(a.httpRange.FormatHTTPRange()), a.etag) return a, nil } // Read will return the next available data. func (r *RangeReader) Read(p []byte) (n int, err error) { defer func() { r.offset += int64(n) r.httpRange.Offset += int64(n) }() n, err = r.read(p) return } func (r *RangeReader) read(p []byte) (int, error) { if r.closed { return 0, fmt.Errorf("RangeReader is closed") } // open stream if r.in == nil { httpRangeRemains := r.httpRange if r.httpRange.Count > 0 { gotNum := r.httpRange.Offset - r.oriHttpRange.Offset if gotNum > 0 && r.httpRange.Count > gotNum { httpRangeRemains.Count = r.httpRange.Count - gotNum } } output, err := r.rangeGet(r.context, httpRangeRemains) if err == nil { etag := ToString(output.ETag) if r.etag == "" { r.etag = etag r.modTime = output.LastModified } if etag != r.etag { err = fmt.Errorf("Source file is changed, expect etag:%s ,got etag:%s", r.etag, etag) } // Partial Response check var off int64 if output.ContentRange == nil { off = 0 r.totalSize = output.ContentLength } else { off, _, r.totalSize, _ = ParseContentRange(*output.ContentRange) } if off != httpRangeRemains.Offset { err = fmt.Errorf("Range get fail, expect offset:%v, got offset:%v", httpRangeRemains.Offset, off) } } if err != nil { if output != nil && output.Body != nil { output.Body.Close() } return 0, err } body := output.Body if httpRangeRemains.Count > 0 { body = NewLimitedReadCloser(output.Body, httpRangeRemains.Count) } r.in = body } // read from stream // ignore error when reading from stream n, err := r.in.Read(p) if err != nil && err != io.EOF { r.in.Close() r.in = nil err = nil } return n, err } func (r *RangeReader) Offset() int64 { return r.offset } func (r *RangeReader) Close() (err error) { if r.closed { return nil } r.closed = true if r.in != nil { err = r.in.Close() } return } // TeeReadNopCloser returns a Reader that writes to w what it reads from r. // All reads from r performed through it are matched with // corresponding writes to w. There is no internal buffering - // the write must complete before the read completes. // Any error encountered while writing is reported as a read error. func TeeReadNopCloser(reader io.Reader, writers ...io.Writer) io.ReadCloser { return &teeReadNopCloser{ reader: reader, writers: writers, mark: -1, } } type teeReadNopCloser struct { reader io.Reader writers []io.Writer mark int64 } func (t *teeReadNopCloser) Read(p []byte) (n int, err error) { n, err = t.reader.Read(p) if n > 0 { for _, w := range t.writers { if nn, err := w.Write(p[:n]); err != nil { return nn, err } } } return } func (t *teeReadNopCloser) Seek(offset int64, whence int) (int64, error) { switch t := t.reader.(type) { case io.Seeker: return t.Seek(offset, whence) } return int64(0), nil } func (t *teeReadNopCloser) Close() error { return nil } // IsSeekable tests if this reader supports Seek method. func (t *teeReadNopCloser) IsSeekable() bool { _, ok := t.reader.(io.Seeker) return ok } // MarkSupported tests if this reader supports the Mark and Reset methods. func (t *teeReadNopCloser) MarkSupported() bool { return t.IsSeekable() } // Mark marks the current position in this reader. A subsequent call to // the Reset method repositions this reader at the last marked position // so that subsequent reads re-read the same bytes. func (t *teeReadNopCloser) Mark() { if s, ok := t.reader.(io.Seeker); ok { if pos, err := s.Seek(0, io.SeekCurrent); err == nil { t.mark = pos } } } // Reset repositions this stream to the position at the time // the Mark method was last called on this reader. func (t *teeReadNopCloser) Reset() error { if !t.MarkSupported() { return fmt.Errorf("Mark/Reset not supported") } if t.mark < 0 { return fmt.Errorf("Mark is not called yet") } // seek to the last marked position if s, ok := t.reader.(io.Seeker); ok { if _, err := s.Seek(t.mark, io.SeekStart); err != nil { return err } } // reset writer type reseter interface { Reset() } for _, w := range t.writers { if rw, ok := w.(reseter); ok { rw.Reset() } } return nil } type DiscardReadCloser struct { RC io.ReadCloser Discard int } func (drc *DiscardReadCloser) Read(b []byte) (int, error) { n, err := drc.RC.Read(b) if drc.Discard == 0 || n <= 0 { return n, err } if n <= drc.Discard { drc.Discard -= n return 0, err } realLen := n - drc.Discard copy(b[0:realLen], b[drc.Discard:n]) drc.Discard = 0 return realLen, err } func (drc *DiscardReadCloser) Close() error { closer, ok := drc.RC.(io.ReadCloser) if ok { return closer.Close() } return nil }