internal/fs/handle/dir_handle.go (160 lines of code) (raw):
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handle
import (
"fmt"
"sort"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/inode"
"github.com/googlecloudplatform/gcsfuse/v2/internal/locker"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"golang.org/x/net/context"
)
// DirHandle is the state required for reading from directories.
type DirHandle struct {
/////////////////////////
// Constant data
/////////////////////////
in inode.DirInode
implicitDirs bool
/////////////////////////
// Mutable state
/////////////////////////
Mu locker.Locker
// All entries in the directory. Populated the first time we need one.
//
// INVARIANT: For each i, entries[i+1].Offset == entries[i].Offset + 1
//
// GUARDED_BY(Mu)
entries []fuseutil.Dirent
// Has entries yet been populated?
//
// INVARIANT: If !entriesValid, then len(entries) == 0
//
// GUARDED_BY(Mu)
entriesValid bool
}
// NewDirHandle creates a directory handle that obtains listings from the supplied inode.
func NewDirHandle(
in inode.DirInode,
implicitDirs bool) (dh *DirHandle) {
// Set up the basic struct.
dh = &DirHandle{
in: in,
implicitDirs: implicitDirs,
}
// Set up invariant checking.
dh.Mu = locker.New("DH."+in.Name().GcsObjectName(), dh.checkInvariants)
return
}
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
// Directory entries, sorted by name.
type sortedDirents []fuseutil.Dirent
func (p sortedDirents) Len() int { return len(p) }
func (p sortedDirents) Less(i, j int) bool { return p[i].Name < p[j].Name }
func (p sortedDirents) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (dh *DirHandle) checkInvariants() {
// INVARIANT: For each i, entries[i+1].Offset == entries[i].Offset + 1
for i := 0; i < len(dh.entries)-1; i++ {
if !(dh.entries[i+1].Offset == dh.entries[i].Offset+1) {
panic(
fmt.Sprintf(
"Unexpected offset sequence: %v, %v",
dh.entries[i].Offset,
dh.entries[i+1].Offset))
}
}
// INVARIANT: If !entriesValid, then len(entries) == 0
if !dh.entriesValid && len(dh.entries) != 0 {
panic("Unexpected non-empty entries slice")
}
}
// Resolve name conflicts between file objects and directory objects (e.g. the
// objects "foo/bar" and "foo/bar/") by appending U+000A, which is illegal in
// GCS object names, to conflicting file names.
//
// Input must be sorted by name.
func fixConflictingNames(entries []fuseutil.Dirent, localEntries map[string]fuseutil.Dirent) (output []fuseutil.Dirent, err error) {
// Sanity check.
if !sort.IsSorted(sortedDirents(entries)) {
err = fmt.Errorf("expected sorted input")
return
}
// Examine each adjacent pair of names.
for i := range entries {
e := &entries[i]
// Find the previous entry.
if i == 0 {
output = append(output, *e)
continue
}
prev := &output[len(output)-1]
// Does the pair have matching names?
if e.Name != prev.Name {
output = append(output, *e)
continue
}
// We expect exactly one to be a directory.
eIsDir := e.Type == fuseutil.DT_Directory
prevIsDir := prev.Type == fuseutil.DT_Directory
if eIsDir == prevIsDir {
if _, ok := localEntries[e.Name]; ok && !eIsDir {
// We have found same entry in GCS and local file entries, i.e, the
// entry is uploaded to GCS but not yet deleted from local entries.
// Do not return the duplicate entry as part of list response.
continue
} else {
err = fmt.Errorf(
"weird dirent type pair for name %q: %v, %v",
e.Name,
e.Type,
prev.Type)
return
}
}
// Repair whichever is not the directory.
if eIsDir {
prev.Name += inode.ConflictingFileNameSuffix
} else {
e.Name += inode.ConflictingFileNameSuffix
}
output = append(output, *e)
}
return
}
// Read all entries for the directory, fix up conflicting names, and fill in
// offset fields.
//
// LOCKS_REQUIRED(in)
func readAllEntries(
ctx context.Context,
in inode.DirInode,
localEntries map[string]fuseutil.Dirent) (entries []fuseutil.Dirent, err error) {
// Read entries from GCS.
// Read one batch at a time.
var tok string
for {
// Read a batch.
var batch []fuseutil.Dirent
batch, tok, err = in.ReadEntries(ctx, tok)
if err != nil {
err = fmt.Errorf("ReadEntries: %w", err)
return
}
// Accumulate.
entries = append(entries, batch...)
// Are we done?
if tok == "" {
break
}
}
// Append local file entries (not synced to GCS).
for _, localEntry := range localEntries {
entries = append(entries, localEntry)
}
// Ensure that the entries are sorted, for use in fixConflictingNames
// below.
sort.Sort(sortedDirents(entries))
// Fix name conflicts.
// When a local file is synced to GCS but not removed from the local file map,
// the entries list will have two duplicate entries.
// To handle this scenario, we are removing the duplicate entry before
// returning the response to kernel.
entries, err = fixConflictingNames(entries, localEntries)
if err != nil {
err = fmt.Errorf("fixConflictingNames: %w", err)
return
}
// Fix up offset fields.
for i := 0; i < len(entries); i++ {
entries[i].Offset = fuseops.DirOffset(i) + 1
}
// Return a bogus inode ID for each entry, but not the root inode ID.
//
// NOTE: As far as I can tell this is harmless. Minting and
// returning a real inode ID is difficult because fuse does not count
// readdir as an operation that increases the inode ID's lookup count, and
// we therefore don't get a forget for it later, but we would like to not
// have to remember every inode ID that we've ever minted for readdir.
//
// If it turns out this is not harmless, we'll need to switch to something
// like inode IDs based on (object name, generation) hashes. But then what
// about the birthday problem? And more importantly, what about our
// semantic of not minting a new inode ID when the generation changes due
// to a local action?
for i := range entries {
entries[i].Inode = fuseops.RootInodeID + 1
}
return
}
// LOCKS_REQUIRED(dh.Mu)
// LOCKS_EXCLUDED(dh.in)
func (dh *DirHandle) ensureEntries(ctx context.Context, localFileEntries map[string]fuseutil.Dirent) (err error) {
dh.in.Lock()
defer dh.in.Unlock()
// Read entries.
var entries []fuseutil.Dirent
entries, err = readAllEntries(ctx, dh.in, localFileEntries)
if err != nil {
err = fmt.Errorf("readAllEntries: %w", err)
return
}
// Update state.
dh.entries = entries
dh.entriesValid = true
return
}
////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
// ReadDir handles a request to read from the directory, without responding.
//
// Special case: we assume that a zero offset indicates that rewinddir has been
// called (since fuse gives us no way to intercept and know for sure), and
// start the listing process over again.
//
// LOCKS_REQUIRED(dh.Mu)
// LOCKS_EXCLUDED(du.in)
func (dh *DirHandle) ReadDir(
ctx context.Context,
op *fuseops.ReadDirOp,
localFileEntries map[string]fuseutil.Dirent) (err error) {
// If the request is for offset zero, we assume that either this is the first
// call or rewinddir has been called. Reset state.
if op.Offset == 0 {
dh.entries = nil
dh.entriesValid = false
}
// Do we need to read entries from GCS?
if !dh.entriesValid {
err = dh.ensureEntries(ctx, localFileEntries)
if err != nil {
return
}
}
// Is the offset past the end of what we have buffered? If so, this must be
// an invalid seekdir according to posix.
index := int(op.Offset)
if index > len(dh.entries) {
err = fuse.EINVAL
return
}
// We copy out entries until we run out of entries or space.
for i := index; i < len(dh.entries); i++ {
n := fuseutil.WriteDirent(op.Dst[op.BytesRead:], dh.entries[i])
if n == 0 {
break
}
op.BytesRead += n
}
return
}