func()

in internal/fs/inode/dir.go [663:772]


func (d *dirInode) readObjects(
	ctx context.Context,
	tok string) (cores map[Name]*Core, newTok string, err error) {
	if d.isBucketHierarchical() {
		d.includeFoldersAsPrefixes = true
	}
	// Ask the bucket to list some objects.
	req := &gcs.ListObjectsRequest{
		Delimiter:                "/",
		IncludeTrailingDelimiter: true,
		Prefix:                   d.Name().GcsObjectName(),
		ContinuationToken:        tok,
		MaxResults:               MaxResultsForListObjectsCall,
		// Setting Projection param to noAcl since fetching owner and acls are not
		// required.
		ProjectionVal:            gcs.NoAcl,
		IncludeFoldersAsPrefixes: d.includeFoldersAsPrefixes,
	}

	listing, err := d.bucket.ListObjects(ctx, req)
	if err != nil {
		err = fmt.Errorf("ListObjects: %w", err)
		return
	}

	cores = make(map[Name]*Core)
	defer func() {
		now := d.cacheClock.Now()
		for fullName, c := range cores {
			d.cache.Insert(now, path.Base(fullName.LocalName()), c.Type())
		}
	}()

	for _, o := range listing.MinObjects {
		// Skip empty results or the directory object backing this inode.
		if o.Name == d.Name().GcsObjectName() || o.Name == "" {
			continue
		}

		nameBase := path.Base(o.Name) // ie. "bar" from "foo/bar/" or "foo/bar"

		// Given the alphabetical order of the objects, if a file "foo" and
		// directory "foo/" coexist, the directory would eventually occupy
		// the value of records["foo"].
		if strings.HasSuffix(o.Name, "/") {
			// In a hierarchical bucket, create a folder entry instead of a minObject for each prefix.
			// This is because in a hierarchical bucket, every directory is considered a folder.
			// Adding folder entries while looping to through CollapsedRuns instead of here to avoid duplicate entries.
			if !d.isBucketHierarchical() {
				dirName := NewDirName(d.Name(), nameBase)
				explicitDir := &Core{
					Bucket:    d.Bucket(),
					FullName:  dirName,
					MinObject: o,
				}
				cores[dirName] = explicitDir
			}
		} else {
			fileName := NewFileName(d.Name(), nameBase)
			file := &Core{
				Bucket:    d.Bucket(),
				FullName:  fileName,
				MinObject: o,
			}
			cores[fileName] = file
		}
	}

	// Return an appropriate continuation token, if any.
	newTok = listing.ContinuationToken

	if !d.implicitDirs && !d.isBucketHierarchical() {
		return
	}

	// Add implicit directories into the result.
	unsupportedPrefixes := []string{}
	for _, p := range listing.CollapsedRuns {
		pathBase := path.Base(p)
		if storageutil.IsUnsupportedObjectName(p) {
			unsupportedPrefixes = append(unsupportedPrefixes, p)
		}
		dirName := NewDirName(d.Name(), pathBase)
		if d.isBucketHierarchical() {
			folder := gcs.Folder{Name: dirName.objectName}

			folderCore := &Core{
				Bucket:   d.Bucket(),
				FullName: dirName,
				Folder:   &folder,
			}
			cores[dirName] = folderCore
		} else {
			if c, ok := cores[dirName]; ok && c.Type() == metadata.ExplicitDirType {
				continue
			}

			implicitDir := &Core{
				Bucket:    d.Bucket(),
				FullName:  dirName,
				MinObject: nil,
			}
			cores[dirName] = implicitDir
		}
	}
	if len(unsupportedPrefixes) > 0 {
		logger.Errorf("Encountered unsupported prefixes during listing: %v", unsupportedPrefixes)
	}
	return
}