pkg/rawlogs/repository.go (101 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rawlogs
import (
"fmt"
"io"
"os"
"sort"
"sync"
"time"
)
// FilesystemRepository stores retrieved raw log data with timestamp.
// This can sort data won't fit the entire log data on memory.
type FilesystemRepository struct {
writeLock *sync.Mutex
file *os.File
exported bool
workspaceEntries []workspaceEntry
lastOffset uint64
}
type fileSystemRepositoryIterator struct {
ownerRepository *FilesystemRepository
entries []workspaceEntry
currentIndex int
}
type workspaceEntry struct {
TimestampInNanos uint64
Offset uint64
Size int
}
var _ Repository = (*FilesystemRepository)(nil)
var _ LogIterator = (*fileSystemRepositoryIterator)(nil)
func NewFilesystemRepository() (*FilesystemRepository, error) {
tmpFile, err := os.CreateTemp("/tmp", "khi-rawlog-")
if err != nil {
return nil, err
}
return &FilesystemRepository{
writeLock: &sync.Mutex{},
file: tmpFile,
exported: false,
workspaceEntries: make([]workspaceEntry, 0),
lastOffset: 0,
}, nil
}
func (r *FilesystemRepository) Write(timestamp time.Time, data []byte) error {
if r.exported {
return fmt.Errorf("unsupported operation. data is already sorted and exported")
}
r.writeLock.Lock()
defer r.writeLock.Unlock()
err := r.writeToWorkspace(timestamp, data)
if err != nil {
return err
}
return nil
}
func (r *FilesystemRepository) IterateInSortedOrder() LogIterator {
r.exported = true
sort.Slice(r.workspaceEntries, func(i, j int) bool {
return r.workspaceEntries[i].TimestampInNanos < r.workspaceEntries[j].TimestampInNanos
})
return &fileSystemRepositoryIterator{
ownerRepository: r,
entries: r.workspaceEntries,
currentIndex: 0,
}
}
func (r *FilesystemRepository) writeToWorkspace(timestamp time.Time, data []byte) error {
_, err := r.file.Seek(0, 2) // go to end of the workspace file
if err != nil {
return err
}
_, err = r.file.Write(data)
if err != nil {
return err
}
r.workspaceEntries = append(r.workspaceEntries, workspaceEntry{
TimestampInNanos: uint64(timestamp.UnixNano()),
Offset: r.lastOffset,
Size: len(data),
})
r.lastOffset += uint64(len(data))
return nil
}
func (r *FilesystemRepository) Dispose() error {
return os.Remove(r.file.Name())
}
// Implementations for fileSystemRepositoryIterator
func (i *fileSystemRepositoryIterator) HasNext() bool {
return i.currentIndex < len(i.entries)
}
func (i *fileSystemRepositoryIterator) Next() ([]byte, error) {
if !i.HasNext() {
return nil, io.EOF
}
result := make([]byte, i.entries[i.currentIndex].Size)
if _, err := i.ownerRepository.file.ReadAt(result, int64(i.entries[i.currentIndex].Offset)); err != nil {
return nil, err
}
i.currentIndex += 1
return result, nil
}
func (i *fileSystemRepositoryIterator) Reset() {
i.currentIndex = 0
}