oss/filelike.go (628 lines of code) (raw):

package oss import ( "bytes" "context" "errors" "fmt" "io" "net/http" "os" "runtime" "strings" "time" ) type OpenOptions struct { Offset int64 VersionId *string EnablePrefetch bool PrefetchNum int ChunkSize int64 PrefetchThreshold int64 RequestPayer *string OutOfOrderReadThreshold int64 } type ReadOnlyFile struct { client OpenFileAPIClient context context.Context // object info bucket string key string versionId *string requestPayer *string // file info sizeInBytes int64 modTime string etag string headers http.Header // current read position offset int64 // read reader io.ReadCloser readBufOffset int64 // prefetch enablePrefetch bool chunkSize int64 prefetchNum int prefetchThreshold int64 asyncReaders []*AsyncRangeReader seqReadAmount int64 // number of sequential read numOOORead int64 // number of out of order read closed bool // whether we have closed the file oooReadThreshold int64 } // NewReadOnlyFile OpenFile opens the named file for reading. // If successful, methods on the returned file can be used for reading. func NewReadOnlyFile(ctx context.Context, c OpenFileAPIClient, bucket string, key string, optFns ...func(*OpenOptions)) (*ReadOnlyFile, error) { options := OpenOptions{ Offset: 0, EnablePrefetch: false, PrefetchNum: DefaultPrefetchNum, ChunkSize: DefaultPrefetchChunkSize, PrefetchThreshold: DefaultPrefetchThreshold, OutOfOrderReadThreshold: DefaultOutOfOrderReadThreshold, } for _, fn := range optFns { fn(&options) } if options.EnablePrefetch { var chunkSize int64 if options.ChunkSize > 0 { chunkSize = (options.ChunkSize + AsyncReadeBufferSize - 1) / AsyncReadeBufferSize * AsyncReadeBufferSize } else { chunkSize = DefaultPrefetchChunkSize } options.ChunkSize = chunkSize if options.PrefetchNum <= 0 { options.PrefetchNum = DefaultPrefetchNum } if options.OutOfOrderReadThreshold <= int64(0) { options.OutOfOrderReadThreshold = DefaultOutOfOrderReadThreshold } } f := &ReadOnlyFile{ client: c, context: ctx, bucket: bucket, key: key, versionId: options.VersionId, requestPayer: options.RequestPayer, offset: options.Offset, enablePrefetch: options.EnablePrefetch, prefetchNum: options.PrefetchNum, chunkSize: options.ChunkSize, prefetchThreshold: options.PrefetchThreshold, oooReadThreshold: options.OutOfOrderReadThreshold, } result, err := f.client.HeadObject(f.context, &HeadObjectRequest{ Bucket: &f.bucket, Key: &f.key, VersionId: f.versionId, RequestPayer: f.requestPayer, }) if err != nil { return nil, err } //File info f.sizeInBytes = result.ContentLength f.modTime = result.Headers.Get(HTTPHeaderLastModified) f.etag = result.Headers.Get(HTTPHeaderETag) f.headers = result.Headers if f.sizeInBytes < 0 { return nil, fmt.Errorf("file size is invaid, got %v", f.sizeInBytes) } if f.offset > f.sizeInBytes { return nil, fmt.Errorf("offset is unavailable, offset:%v, file size:%v", f.offset, f.sizeInBytes) } return f, nil } // Close closes the File. func (f *ReadOnlyFile) Close() error { if f == nil { return os.ErrInvalid } return f.close() } func (f *ReadOnlyFile) close() error { if f.closed { return nil } if f.reader != nil { f.reader.Close() f.reader = nil } for _, reader := range f.asyncReaders { reader.Close() } f.asyncReaders = nil f.closed = true runtime.SetFinalizer(f, nil) return nil } // Read reads up to len(b) bytes from the File and stores them in b. // It returns the number of bytes read and any error encountered. // At end of file, Read returns 0, io.EOF. func (f *ReadOnlyFile) Read(p []byte) (bytesRead int, err error) { if err := f.checkValid("read"); err != nil { return 0, err } n, e := f.read(p) return n, f.wrapErr("read", e) } func (f *ReadOnlyFile) read(p []byte) (bytesRead int, err error) { defer func() { f.offset += int64(bytesRead) }() nwant := len(p) var nread int for bytesRead < nwant && err == nil { nread, err = f.readInternal(f.offset+int64(bytesRead), p[bytesRead:]) if nread > 0 { bytesRead += nread } } return } // Seek sets the offset for the next Read or Write on file to offset, interpreted // according to whence: 0 means relative to the origin of the file, 1 means // relative to the current offset, and 2 means relative to the end. // It returns the new offset and an error. func (f *ReadOnlyFile) Seek(offset int64, whence int) (int64, error) { if err := f.checkValid("seek"); err != nil { return 0, err } r, e := f.seek(offset, whence) if e != nil { return 0, f.wrapErr("seek", e) } return r, nil } func (f *ReadOnlyFile) seek(offset int64, whence int) (int64, error) { var abs int64 switch whence { case io.SeekStart: abs = offset case io.SeekCurrent: abs = f.offset + offset case io.SeekEnd: abs = f.sizeInBytes + offset default: return 0, fmt.Errorf("invalid whence") } if abs < 0 { return 0, fmt.Errorf("negative position") } if abs > f.sizeInBytes { return offset - (abs - f.sizeInBytes), fmt.Errorf("offset is unavailable") } f.offset = abs return abs, nil } type fileInfo struct { name string size int64 modTime time.Time header http.Header } func (fi *fileInfo) Name() string { return fi.name } func (fi *fileInfo) Size() int64 { return fi.size } func (fi *fileInfo) Mode() os.FileMode { return os.FileMode(0644) } func (fi *fileInfo) ModTime() time.Time { return fi.modTime } func (fi *fileInfo) IsDir() bool { return false } func (fi *fileInfo) Sys() any { return fi.header } // Stat returns the FileInfo structure describing file. func (f *ReadOnlyFile) Stat() (os.FileInfo, error) { if err := f.checkValid("stat"); err != nil { return nil, err } mtime, _ := http.ParseTime(f.modTime) return &fileInfo{ name: f.name(), size: f.sizeInBytes, modTime: mtime, header: f.headers, }, nil } func (f *ReadOnlyFile) name() string { var name string if f.versionId != nil { name = fmt.Sprintf("oss://%s/%s?versionId=%s", f.bucket, f.key, *f.versionId) } else { name = fmt.Sprintf("oss://%s/%s", f.bucket, f.key) } return name } func (f *ReadOnlyFile) wrapErr(op string, err error) error { if err == nil || err == io.EOF { return err } return &os.PathError{Op: op, Path: f.name(), Err: err} } func (f *ReadOnlyFile) checkValid(_ string) error { if f == nil { return os.ErrInvalid } else if f.closed { return os.ErrClosed } return nil } func (f *ReadOnlyFile) readInternal(offset int64, p []byte) (bytesRead int, err error) { defer func() { if bytesRead > 0 { f.readBufOffset += int64(bytesRead) f.seqReadAmount += int64(bytesRead) } }() if offset >= f.sizeInBytes { err = io.EOF return } if f.readBufOffset != offset { f.readBufOffset = offset f.seqReadAmount = 0 if f.reader != nil { f.reader.Close() f.reader = nil } if f.asyncReaders != nil { f.numOOORead++ } for _, ar := range f.asyncReaders { ar.Close() } f.asyncReaders = nil } if f.enablePrefetch && f.seqReadAmount >= f.prefetchThreshold && f.numOOORead < f.oooReadThreshold { //swith to async reader if f.reader != nil { f.reader.Close() f.reader = nil } err = f.prefetch(offset, len(p)) if err == nil { bytesRead, err = f.readFromPrefetcher(offset, p) if err == nil { return } } // fall back to read serially f.seqReadAmount = 0 for _, ar := range f.asyncReaders { ar.Close() } f.asyncReaders = nil } bytesRead, err = f.readDirect(offset, p) return } func (f *ReadOnlyFile) readDirect(offset int64, buf []byte) (bytesRead int, err error) { if offset >= f.sizeInBytes { return } if f.reader == nil { var result *GetObjectResult result, err = f.client.GetObject(f.context, &GetObjectRequest{ Bucket: Ptr(f.bucket), Key: Ptr(f.key), VersionId: f.versionId, Range: Ptr(fmt.Sprintf("bytes=%d-", offset)), RangeBehavior: Ptr("standard"), RequestPayer: f.requestPayer, }) if err != nil { return bytesRead, err } if err = f.checkResultValid(offset, result.Headers); err != nil { if result != nil { result.Body.Close() } return bytesRead, err } f.reader = result.Body } bytesRead, err = f.reader.Read(buf) if err != nil { f.reader.Close() f.reader = nil err = nil } return } func (f *ReadOnlyFile) checkResultValid(offset int64, header http.Header) error { modTime := header.Get(HTTPHeaderLastModified) etag := header.Get(HTTPHeaderETag) gotOffset, _ := parseOffsetAndSizeFromHeaders(header) if gotOffset != offset { return fmt.Errorf("Range get fail, expect offset:%v, got offset:%v", offset, gotOffset) } if (modTime != "" && f.modTime != "" && modTime != f.modTime) || (etag != "" && f.etag != "" && etag != f.etag) { return fmt.Errorf("Source file is changed, origin info [%v,%v], new info [%v,%v]", f.modTime, f.etag, modTime, etag) } return nil } func (f *ReadOnlyFile) readFromPrefetcher(offset int64, buf []byte) (bytesRead int, err error) { var nread int for len(f.asyncReaders) != 0 { asyncReader := f.asyncReaders[0] //check offset if offset != asyncReader.Offset() { return 0, errors.New("out of order") } nread, err = asyncReader.Read(buf) bytesRead += nread if err != nil { if err == io.EOF { //fmt.Printf("asyncReader done\n") asyncReader.Close() f.asyncReaders = f.asyncReaders[1:] err = nil } else { return 0, err } } buf = buf[nread:] if len(buf) == 0 { return } // Update offset for the next read offset += int64(nread) } return } func (f *ReadOnlyFile) prefetch(offset int64, _ /*needAtLeast*/ int) (err error) { off := offset for _, ar := range f.asyncReaders { off = ar.oriHttpRange.Offset + ar.oriHttpRange.Count } //fmt.Printf("prefetch:offset %v, needAtLeast:%v, off:%v\n", offset, needAtLeast, off) for len(f.asyncReaders) < f.prefetchNum { remaining := f.sizeInBytes - off size := minInt64(remaining, f.chunkSize) cnt := (size + (AsyncReadeBufferSize - 1)) / AsyncReadeBufferSize //fmt.Printf("f.sizeInBytes:%v, off:%v, size:%v, cnt:%v\n", f.sizeInBytes, off, size, cnt) //NewAsyncRangeReader support softStartInitial, add more buffer count to prevent connections from not being released if size > softStartInitial { acnt := (AsyncReadeBufferSize+(softStartInitial-1))/softStartInitial - 1 cnt += int64(acnt) } if size != 0 { getFn := func(ctx context.Context, httpRange HTTPRange) (output *ReaderRangeGetOutput, err error) { request := &GetObjectRequest{ Bucket: Ptr(f.bucket), Key: Ptr(f.key), VersionId: f.versionId, RequestPayer: f.requestPayer, } rangeStr := httpRange.FormatHTTPRange() if rangeStr != nil { request.Range = rangeStr request.RangeBehavior = Ptr("standard") } var result *GetObjectResult result, err = f.client.GetObject(f.context, request) if err != nil { return nil, err } return &ReaderRangeGetOutput{ Body: result.Body, ETag: result.ETag, ContentLength: result.ContentLength, ContentRange: result.ContentRange, }, nil //fmt.Printf("result.Headers:%#v\n", result.Headers) } ar, err := NewAsyncRangeReader(f.context, getFn, &HTTPRange{off, size}, f.etag, int(cnt)) if err != nil { break } f.asyncReaders = append(f.asyncReaders, ar) off += size } if size != f.chunkSize { break } } return nil } type AppendOptions struct { // To indicate that the requester is aware that the request and data download will incur costs RequestPayer *string // The parameters when the object is first generated, supports below // CacheControl, ContentEncoding, Expires, ContentType, ContentType, Metadata // SSE's parameters, Acl, StorageClass, Tagging // If the object exists, ignore this parameters CreateParameter *AppendObjectRequest } type AppendOnlyFile struct { client AppendFileAPIClient context context.Context // object info bucket string key string requestPayer *string info os.FileInfo created bool createParam *AppendObjectRequest // current write position offset int64 hashCRC64 *string closed bool } // NewAppendFile AppendFile opens or creates the named file for appending. // If successful, methods on the returned file can be used for appending. func NewAppendFile(ctx context.Context, c AppendFileAPIClient, bucket string, key string, optFns ...func(*AppendOptions)) (*AppendOnlyFile, error) { options := AppendOptions{} for _, fn := range optFns { fn(&options) } f := &AppendOnlyFile{ client: c, context: ctx, bucket: bucket, key: key, requestPayer: options.RequestPayer, created: false, createParam: options.CreateParameter, } result, err := f.client.HeadObject(f.context, &HeadObjectRequest{Bucket: &f.bucket, Key: &f.key, RequestPayer: f.requestPayer}) if err != nil { var serr *ServiceError if errors.As(err, &serr) && serr.StatusCode == 404 { // not found } else { return nil, err } } else { if !strings.EqualFold(ToString(result.ObjectType), "Appendable") { return nil, errors.New("Not a appendable file") } f.offset = result.ContentLength f.hashCRC64 = result.HashCRC64 f.created = true } return f, nil } // Write writes len(b) bytes from b to the AppendOnlyFile. // It returns the number of bytes written and an error, if any. // Write returns a non-nil error when n != len(b). func (f *AppendOnlyFile) Write(b []byte) (n int, err error) { if err := f.checkValid("write"); err != nil { return 0, err } n, e := f.write(b) if n < 0 { n = 0 } if e == nil && n != len(b) { err = io.ErrShortWrite } if e != nil { err = f.wrapErr("write", e) } return n, err } // write writes len(b) bytes to the File. // It returns the number of bytes written and an error, if any. func (f *AppendOnlyFile) write(b []byte) (n int, err error) { offset := f.offset request := &AppendObjectRequest{ Bucket: &f.bucket, Key: &f.key, Position: Ptr(f.offset), Body: bytes.NewReader(b), InitHashCRC64: f.hashCRC64, RequestPayer: f.requestPayer, } f.applyCreateParamIfNeed(request) if f.offset == 0 { request.InitHashCRC64 = Ptr("0") } var result *AppendObjectResult if result, err = f.client.AppendObject(f.context, request); err == nil { f.offset = result.NextPosition f.hashCRC64 = result.HashCRC64 f.created = true } else { var serr *ServiceError if errors.As(err, &serr) && serr.Code == "PositionNotEqualToLength" { if position, hashcrc, ok := f.nextAppendPosition(); ok { if offset+int64(len(b)) == position { f.offset = position f.hashCRC64 = hashcrc f.created = true err = nil } } } } return int(f.offset - offset), err } // WriteFrom writes io.Reader to the File. // It returns the number of bytes written and an error, if any. func (f *AppendOnlyFile) WriteFrom(r io.Reader) (n int64, err error) { if err := f.checkValid("write"); err != nil { return 0, err } n, err = f.writeFrom(r) if err != nil { err = f.wrapErr("write", err) } return n, err } func (f *AppendOnlyFile) writeFrom(r io.Reader) (n int64, err error) { offset := f.offset request := &AppendObjectRequest{ Bucket: &f.bucket, Key: &f.key, Position: Ptr(f.offset), Body: r, RequestPayer: f.requestPayer, } f.applyCreateParamIfNeed(request) var roffset int64 var rs io.Seeker rs, ok := r.(io.Seeker) if ok { roffset, _ = rs.Seek(0, io.SeekCurrent) } var result *AppendObjectResult if result, err = f.client.AppendObject(f.context, request); err == nil { f.offset = result.NextPosition f.hashCRC64 = result.HashCRC64 f.created = true } else { var serr *ServiceError if errors.As(err, &serr) && serr.Code == "PositionNotEqualToLength" { if position, hashcrc, ok := f.nextAppendPosition(); ok { if rs != nil { if curr, e := rs.Seek(0, io.SeekCurrent); e == nil { if offset+(curr-roffset) == position { f.offset = position f.hashCRC64 = hashcrc f.created = true err = nil } } } } } } return f.offset - offset, err } func (f *AppendOnlyFile) nextAppendPosition() (int64, *string, bool) { if h, e := f.client.HeadObject(f.context, &HeadObjectRequest{Bucket: &f.bucket, Key: &f.key, RequestPayer: f.requestPayer}); e == nil { return h.ContentLength, h.HashCRC64, true } return 0, nil, false } func (f *AppendOnlyFile) applyCreateParamIfNeed(request *AppendObjectRequest) { if f.created || f.createParam == nil { return } if len(f.createParam.Acl) > 0 { request.Acl = f.createParam.Acl } if len(f.createParam.StorageClass) > 0 { request.StorageClass = f.createParam.StorageClass } request.CacheControl = f.createParam.CacheControl request.ContentDisposition = f.createParam.ContentDisposition request.ContentEncoding = f.createParam.ContentEncoding request.Expires = f.createParam.Expires request.ContentType = f.createParam.ContentType request.ServerSideEncryption = f.createParam.ServerSideEncryption request.ServerSideDataEncryption = f.createParam.ServerSideDataEncryption request.ServerSideEncryptionKeyId = f.createParam.ServerSideEncryptionKeyId request.Metadata = f.createParam.Metadata request.Tagging = f.createParam.Tagging } // wrapErr wraps an error that occurred during an operation on an open file. // It passes io.EOF through unchanged, otherwise converts // Wraps the error in a PathError. func (f *AppendOnlyFile) wrapErr(op string, err error) error { if err == nil || err == io.EOF { return err } return &os.PathError{Op: op, Path: f.name(), Err: err} } func (f *AppendOnlyFile) checkValid(_ string) error { if f == nil { return os.ErrInvalid } else if f.closed { return os.ErrClosed } return nil } func (f *AppendOnlyFile) name() string { return fmt.Sprintf("oss://%s/%s", f.bucket, f.key) } // Stat returns the FileInfo structure describing file. func (f *AppendOnlyFile) Stat() (os.FileInfo, error) { if err := f.checkValid("stat"); err != nil { return nil, err } info, err := f.stat() if err != nil { return nil, f.wrapErr("stat", err) } return info, nil } func (f *AppendOnlyFile) stat() (os.FileInfo, error) { var err error if f.info == nil || f.info.Size() != f.offset { f.info = nil if result, err := f.client.HeadObject(f.context, &HeadObjectRequest{Bucket: &f.bucket, Key: &f.key, RequestPayer: f.requestPayer}); err == nil { f.info = &fileInfo{ name: f.name(), size: result.ContentLength, modTime: ToTime(result.LastModified), header: result.Headers, } } } return f.info, err } // Close closes the File. func (f *AppendOnlyFile) Close() error { if f == nil { return os.ErrInvalid } return f.close() } func (f *AppendOnlyFile) close() error { f.closed = true return nil }