func()

in oss/io_utils.go [252:356]


func (a *AsyncRangeReader) init(buffers int) {
	a.ready = make(chan *buffer, buffers)
	a.token = make(chan struct{}, buffers)
	a.exit = make(chan struct{})
	a.exited = make(chan struct{})
	a.buffers = buffers
	a.cur = nil
	a.size = softStartInitial

	// Create tokens
	for i := 0; i < buffers; i++ {
		a.token <- struct{}{}
	}

	// Start async reader
	go func() {
		// Ensure that when we exit this is signalled.
		defer close(a.exited)
		defer close(a.ready)
		for {
			select {
			case <-a.token:
				b := a.getBuffer()
				if a.size < AsyncReadeBufferSize {
					b.buf = b.buf[:a.size]
					a.size <<= 1
				}

				if a.httpRange.Count > 0 && a.gotsize > a.httpRange.Count {
					b.buf = b.buf[0:0]
					b.err = io.EOF
					//fmt.Printf("a.gotsize > a.httpRange.Count, err:%v\n", b.err)
					a.ready <- b
					return
				}

				if a.in == nil {
					httpRangeRemains := a.httpRange
					if a.httpRange.Count > 0 {
						gotNum := a.httpRange.Offset - a.oriHttpRange.Offset
						if gotNum > 0 && a.httpRange.Count > gotNum {
							httpRangeRemains.Count = a.httpRange.Count - gotNum
						}
					}
					output, err := a.rangeGet(a.context, httpRangeRemains)
					if err == nil {
						etag := ToString(output.ETag)
						if a.etag == "" {
							a.etag = etag
						}
						if etag != a.etag {
							err = fmt.Errorf("Source file is changed, expect etag:%s ,got etag:%s", a.etag, etag)
						}

						// Partial Response check
						var off int64
						if output.ContentRange == nil {
							off = 0
						} else {
							off, _, _, _ = ParseContentRange(*output.ContentRange)
						}
						if off != httpRangeRemains.Offset {
							err = fmt.Errorf("Range get fail, expect offset:%v, got offset:%v", httpRangeRemains.Offset, off)
						}
					}
					if err != nil {
						b.buf = b.buf[0:0]
						b.err = err
						if output != nil && output.Body != nil {
							output.Body.Close()
						}
						//fmt.Printf("call getFunc fail, err:%v\n", err)
						a.ready <- b
						return
					}
					body := output.Body
					if httpRangeRemains.Count > 0 {
						body = NewLimitedReadCloser(output.Body, httpRangeRemains.Count)
					}
					a.in = body
					//fmt.Printf("call getFunc done, range:%s\n", ToString(httpRangeRemains.FormatHTTPRange()))
				}

				// ignore err from read
				err := b.read(a.in)
				a.httpRange.Offset += int64(len(b.buf))
				a.gotsize += int64(len(b.buf))
				if err != io.EOF {
					b.err = nil
				}
				//fmt.Printf("read into buffer, size:%v, next begin:%v, err:%v\n", len(b.buf), a.httpRange.Offset, err)
				a.ready <- b
				if err != nil {
					a.in.Close()
					a.in = nil
					if err == io.EOF {
						return
					}
				}
			case <-a.exit:
				return
			}
		}
	}()
}