pkg/model/binarychunk/builder.go (117 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binarychunk
import (
"context"
"crypto/md5"
"encoding/binary"
"fmt"
"io"
"sync"
"github.com/GoogleCloudPlatform/khi/pkg/common"
"github.com/GoogleCloudPlatform/khi/pkg/inspection/metadata/progress"
)
const MAXIMUM_CHUNK_SIZE = 1024 * 1024 * 500
// Builder builds the list of binary data from given sequence of byte arrays.
type Builder struct {
// Map between MD5 of given string and the reference of the buffer
tmpFolderPath string
referenceCache *common.ShardingMap[*BinaryReference]
bufferWriters []LargeBinaryWriter
compressor Compressor
maxChunkSize int
lock sync.Mutex
}
func NewBuilder(compressor Compressor, tmpFolderPath string) *Builder {
return &Builder{
tmpFolderPath: tmpFolderPath,
maxChunkSize: MAXIMUM_CHUNK_SIZE,
referenceCache: common.NewShardingMap[*BinaryReference](common.NewSuffixShardingProvider(128, 4)),
compressor: compressor,
bufferWriters: make([]LargeBinaryWriter, 0),
lock: sync.Mutex{},
}
}
// Write amends the givenBinary in some binary chunk. If same body was given previously, it will return the reference from the cache.
func (b *Builder) Write(binaryBody []byte) (*BinaryReference, error) {
hash := b.calcStringHash(binaryBody)
refCache := b.referenceCache.AcquireShard(hash)
defer b.referenceCache.ReleaseShard(hash)
if data, exists := refCache[hash]; exists {
return data, nil
}
b.lock.Lock()
targetIndex := len(b.bufferWriters)
for i := 0; i < len(b.bufferWriters); i++ {
if b.bufferWriters[i].CanWrite(len(binaryBody)) {
targetIndex = i
break
}
}
if len(b.bufferWriters) <= targetIndex {
// Due to the ArrayBuffer of Javascript limitation, each chunk must be smaller than 1GB.
writer, err := NewFileSystemBinaryWriter(b.tmpFolderPath, len(b.bufferWriters), b.maxChunkSize)
if err != nil {
b.lock.Unlock()
return nil, err
}
b.bufferWriters = append(b.bufferWriters, writer)
}
resultReference, err := b.bufferWriters[targetIndex].Write(binaryBody)
if err != nil {
return nil, err
}
b.lock.Unlock()
refCache[hash] = resultReference
return resultReference, nil
}
func (b *Builder) Read(ref *BinaryReference) ([]byte, error) {
b.lock.Lock()
defer b.lock.Unlock()
if ref.Buffer >= len(b.bufferWriters) {
return nil, fmt.Errorf("buffer index %d is out of the range", ref.Buffer)
}
bw := b.bufferWriters[ref.Buffer]
return bw.Read(ref)
}
// Build amends all the binary buffers to the given writer in KHI format. Returns the written byte size.
func (b *Builder) Build(ctx context.Context, writer io.Writer, progress *progress.TaskProgress) (int, error) {
allBinarySize := 0
b.lock.Lock()
defer b.lock.Unlock()
for i, binaryWriter := range b.bufferWriters {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
binaryWriter.Dispose()
b.compressor.Dispose()
return 0, err
}
default:
progress.Update(float32(i)/float32(len(b.bufferWriters)), fmt.Sprintf("Compressing binary part... %d of %d", i, len(b.bufferWriters)))
binaryReader, err := binaryWriter.GetBinary()
if err != nil {
return 0, err
}
compressedReader, err := b.compressor.CompressAll(ctx, binaryReader)
if err != nil {
return 0, err
}
readResult, err := io.ReadAll(compressedReader)
if err != nil {
return 0, err
}
sizeInBytesBinary := make([]byte, 4)
binary.BigEndian.PutUint32(sizeInBytesBinary, uint32(len(readResult)))
if writtenSize, err := writer.Write(sizeInBytesBinary); err != nil {
return 0, err
} else {
allBinarySize += writtenSize
}
if writtenSize, err := writer.Write(readResult); err != nil {
return 0, err
} else {
allBinarySize += writtenSize
}
binaryWriter.Dispose()
}
}
b.compressor.Dispose()
return allBinarySize, nil
}
func (b *Builder) calcStringHash(source []byte) string {
return fmt.Sprintf("%x", md5.Sum(source))
}