in oss/io_utils.go [252:356]
func (a *AsyncRangeReader) init(buffers int) {
a.ready = make(chan *buffer, buffers)
a.token = make(chan struct{}, buffers)
a.exit = make(chan struct{})
a.exited = make(chan struct{})
a.buffers = buffers
a.cur = nil
a.size = softStartInitial
// Create tokens
for i := 0; i < buffers; i++ {
a.token <- struct{}{}
}
// Start async reader
go func() {
// Ensure that when we exit this is signalled.
defer close(a.exited)
defer close(a.ready)
for {
select {
case <-a.token:
b := a.getBuffer()
if a.size < AsyncReadeBufferSize {
b.buf = b.buf[:a.size]
a.size <<= 1
}
if a.httpRange.Count > 0 && a.gotsize > a.httpRange.Count {
b.buf = b.buf[0:0]
b.err = io.EOF
//fmt.Printf("a.gotsize > a.httpRange.Count, err:%v\n", b.err)
a.ready <- b
return
}
if a.in == nil {
httpRangeRemains := a.httpRange
if a.httpRange.Count > 0 {
gotNum := a.httpRange.Offset - a.oriHttpRange.Offset
if gotNum > 0 && a.httpRange.Count > gotNum {
httpRangeRemains.Count = a.httpRange.Count - gotNum
}
}
output, err := a.rangeGet(a.context, httpRangeRemains)
if err == nil {
etag := ToString(output.ETag)
if a.etag == "" {
a.etag = etag
}
if etag != a.etag {
err = fmt.Errorf("Source file is changed, expect etag:%s ,got etag:%s", a.etag, etag)
}
// Partial Response check
var off int64
if output.ContentRange == nil {
off = 0
} else {
off, _, _, _ = ParseContentRange(*output.ContentRange)
}
if off != httpRangeRemains.Offset {
err = fmt.Errorf("Range get fail, expect offset:%v, got offset:%v", httpRangeRemains.Offset, off)
}
}
if err != nil {
b.buf = b.buf[0:0]
b.err = err
if output != nil && output.Body != nil {
output.Body.Close()
}
//fmt.Printf("call getFunc fail, err:%v\n", err)
a.ready <- b
return
}
body := output.Body
if httpRangeRemains.Count > 0 {
body = NewLimitedReadCloser(output.Body, httpRangeRemains.Count)
}
a.in = body
//fmt.Printf("call getFunc done, range:%s\n", ToString(httpRangeRemains.FormatHTTPRange()))
}
// ignore err from read
err := b.read(a.in)
a.httpRange.Offset += int64(len(b.buf))
a.gotsize += int64(len(b.buf))
if err != io.EOF {
b.err = nil
}
//fmt.Printf("read into buffer, size:%v, next begin:%v, err:%v\n", len(b.buf), a.httpRange.Offset, err)
a.ready <- b
if err != nil {
a.in.Close()
a.in = nil
if err == io.EOF {
return
}
}
case <-a.exit:
return
}
}
}()
}