pkg/model/binarychunk/writer.go (100 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package binarychunk import ( "fmt" "io" "os" "sync" ) type BinaryReference struct { Offset int `json:"offset"` Length int `json:"len"` Buffer int `json:"buffer"` } // LargeBinaryWriter stores text as a large binary chunk and returns BinaryReference points the buffer location. type LargeBinaryWriter interface { // Check if the specified text can fit in the buffer CanWrite(size int) bool // Write the specified text and returns the BinaryReference Write(data []byte) (*BinaryReference, error) // Read buffer from a BinaryReference Read(ref *BinaryReference) ([]byte, error) // Obtain the result binary as io.Reader GetBinary() (io.Reader, error) // Free allocated resource for the writer Dispose() error } // FileSystemBinaryWriter is a basic implementation of the LargeTextWriter. type FileSystemBinaryWriter struct { bufferIndex int maximumBufferSize int currentLength int disposed bool file *os.File fileMutex sync.Mutex } var _ LargeBinaryWriter = (*FileSystemBinaryWriter)(nil) func NewFileSystemBinaryWriter(tmpPath string, bufferIndex int, maxSize int) (*FileSystemBinaryWriter, error) { file, err := os.CreateTemp(tmpPath, "khi-") if err != nil { return nil, err } return &FileSystemBinaryWriter{ bufferIndex: bufferIndex, maximumBufferSize: maxSize, currentLength: 0, disposed: false, file: file, fileMutex: sync.Mutex{}, }, nil } func (w *FileSystemBinaryWriter) CanWrite(size int) bool { return !w.disposed && w.currentLength+size <= w.maximumBufferSize } func (w *FileSystemBinaryWriter) Write(data []byte) (*BinaryReference, error) { w.fileMutex.Lock() defer w.fileMutex.Unlock() if !w.CanWrite(len(data)) { return nil, fmt.Errorf("buffer can't write the specified length %d (current:%d,maximum:%d)", len(data), w.currentLength, w.maximumBufferSize) } _, err := w.file.Seek(int64(w.currentLength), io.SeekStart) if err != nil { return nil, err } size, err := w.file.Write(data) if err != nil { return nil, err } reference := &BinaryReference{ Buffer: w.bufferIndex, Length: size, Offset: w.currentLength, } w.currentLength += size return reference, nil } func (w *FileSystemBinaryWriter) Read(ref *BinaryReference) ([]byte, error) { if ref.Buffer != w.bufferIndex { return nil, fmt.Errorf("invalid buffer index. it's not current buffer index") } w.fileMutex.Lock() defer w.fileMutex.Unlock() _, err := w.file.Seek(int64(ref.Offset), io.SeekStart) if err != nil { return nil, err } result := make([]byte, ref.Length) _, err = w.file.Read(result) if err != nil { return nil, err } return result, err } func (w *FileSystemBinaryWriter) GetBinary() (io.Reader, error) { if w.disposed { return nil, fmt.Errorf("instance is already disposed.") } file, err := os.Open(w.file.Name()) if err != nil { return nil, err } return file, nil } func (w *FileSystemBinaryWriter) Dispose() error { if w.disposed { return fmt.Errorf("instance is already disposed.") } return w.file.Close() }