cmd/zc_traverser_blob.go (464 lines of code) (raw):

// Copyright © 2017 Microsoft <wastore@microsoft.com> // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cmd import ( "context" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "net/url" "strings" "time" "github.com/Azure/azure-storage-azcopy/v10/common/parallel" "github.com/pkg/errors" "github.com/Azure/azure-storage-azcopy/v10/common" ) // allow us to iterate through a path pointing to the blob endpoint type blobTraverser struct { rawURL string serviceClient *service.Client ctx context.Context recursive bool // parallel listing employs the hierarchical listing API which is more expensive // cx should have the option to disable this optimization in the name of saving costs parallelListing bool // whether to include blobs that have metadata 'hdi_isfolder = true' includeDirectoryStubs bool // a generic function to notify that a new stored object has been enumerated incrementEnumerationCounter enumerationCounterFunc s2sPreserveSourceTags bool cpkOptions common.CpkOptions preservePermissions common.PreservePermissionsOption includeDeleted bool includeSnapshot bool includeVersion bool isDFS bool } var NonErrorDirectoryStubOverlappable = errors.New("The directory stub exists, and can overlap.") func (t *blobTraverser) IsDirectory(isSource bool) (isDirectory bool, err error) { isDirDirect := copyHandlerUtil{}.urlIsContainerOrVirtualDirectory(t.rawURL) blobURLParts, err := blob.ParseURL(t.rawURL) if err != nil { return false, err } // Skip the single blob check if we're checking a destination. // This is an individual exception for blob because blob supports virtual directories and blobs sharing the same name. // On HNS accounts, we would still perform this test. The user may have provided directory name without path-separator if isDirDirect { // a container or a path ending in '/' is always directory if blobURLParts.ContainerName != "" && blobURLParts.BlobName == "" { // If it's a container, let's ensure that container exists. Listing is a safe assumption to be valid, because how else would we enumerate? containerClient := t.serviceClient.NewContainerClient(blobURLParts.ContainerName) p := containerClient.NewListBlobsFlatPager(nil) _, err = p.NextPage(t.ctx) if bloberror.HasCode(err, bloberror.AuthorizationPermissionMismatch) { // Maybe we don't have the ability to list? Can we get container properties as a fallback? _, propErr := containerClient.GetProperties(t.ctx, nil) err = common.Iff(propErr == nil, nil, err) } } return true, err } if !isSource && !t.isDFS { // destination on blob endpoint. If it does not end in '/' it is a file return false, nil } // All sources and DFS-destinations we'll look further // This call is fine, because there is no trailing / here-- If there's a trailing /, this is surely referring _, _, isDirStub, _, blobErr := t.getPropertiesIfSingleBlob() // We know for sure this is a single blob still, let it walk on through to the traverser. if bloberror.HasCode(blobErr, bloberror.BlobUsesCustomerSpecifiedEncryption) { return false, nil } if blobErr == nil { return isDirStub, nil } containerClient := t.serviceClient.NewContainerClient(blobURLParts.ContainerName) searchPrefix := strings.TrimSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING) + common.AZCOPY_PATH_SEPARATOR_STRING maxResults := int32(1) pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{Prefix: &searchPrefix, MaxResults: &maxResults}) resp, err := pager.NextPage(t.ctx) if err != nil { if azcopyScanningLogger != nil { msg := fmt.Sprintf("Failed to check if the destination is a folder or a file (Azure Files). Assuming the destination is a file: %s", err) azcopyScanningLogger.Log(common.LogError, msg) } return false, nil } if len(resp.Segment.BlobItems) == 0 { // Not a directory, but there was also no file on site. Therefore, there's nothing. return false, errors.New(common.FILE_NOT_FOUND) } return true, nil } func (t *blobTraverser) getPropertiesIfSingleBlob() (response *blob.GetPropertiesResponse, isBlob bool, isDirStub bool, blobName string, err error) { // trim away the trailing slash before we check whether it's a single blob // so that we can detect the directory stub in case there is one blobURLParts, err := blob.ParseURL(t.rawURL) if err != nil { return nil, false, false, "", err } if blobURLParts.BlobName == "" { // This is a container, which needs to be given a proper listing. return nil, false, false, "", nil } /* If the user specified a trailing /, they may mean: A) `folder/` with `hdi_isfolder`, this is intentional. B) `folder` with `hdi_isfolder` C) a virtual directory with children, but no stub */ retry: blobClient, err := createBlobClientFromServiceClient(blobURLParts, t.serviceClient) if err != nil { return nil, false, false, blobURLParts.BlobName, err } props, err := blobClient.GetProperties(t.ctx, &blob.GetPropertiesOptions{CPKInfo: t.cpkOptions.GetCPKInfo()}) if err != nil && strings.HasSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING) { // Trim & retry, maybe the directory stub is DFS style. blobURLParts.BlobName = strings.TrimSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING) goto retry } else if err == nil { // We found the target blob, great! Let's return the details. isDir := gCopyUtil.doesBlobRepresentAFolder(props.Metadata) return &props, !isDir, isDir, blobURLParts.BlobName, nil } // We found nothing. return nil, false, false, "", err } func (t *blobTraverser) getBlobTags() (common.BlobTags, error) { blobURLParts, err := blob.ParseURL(t.rawURL) if err != nil { return nil, err } blobURLParts.BlobName = strings.TrimSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING) // perform the check blobClient, err := createBlobClientFromServiceClient(blobURLParts, t.serviceClient) if err != nil { return nil, err } blobTagsMap := make(common.BlobTags) blobGetTagsResp, err := blobClient.GetTags(t.ctx, nil) if err != nil { return blobTagsMap, err } for _, blobTag := range blobGetTagsResp.BlobTagSet { blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value) } return blobTagsMap, nil } func (t *blobTraverser) Traverse(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) (err error) { blobURLParts, err := blob.ParseURL(t.rawURL) if err != nil { return err } // check if the url points to a single blob blobProperties, isBlob, isDirStub, blobName, err := t.getPropertiesIfSingleBlob() var respErr *azcore.ResponseError if errors.As(err, &respErr) { // Don't error out unless it's a CPK error just yet // If it's a CPK error, we know it's a single blob and that we can't get the properties on it anyway. if respErr.ErrorCode == string(bloberror.BlobUsesCustomerSpecifiedEncryption) { return errors.New("this blob uses customer provided encryption keys (CPK). At the moment, AzCopy does not support CPK-encrypted blobs. " + "If you wish to make use of this blob, we recommend using one of the Azure Storage SDKs") } if respErr.RawResponse == nil { return fmt.Errorf("cannot list files due to reason %s", respErr) } else if respErr.StatusCode == 403 { // Some nature of auth error-- Whatever the user is pointing at, they don't have access to, regardless of whether it's a file or a dir stub. return fmt.Errorf("cannot list files due to reason %s", respErr) } } // schedule the blob in two cases: // 1. either we are targeting a single blob and the URL wasn't explicitly pointed to a virtual dir // 2. either we are scanning recursively with includeDirectoryStubs set to true, // then we add the stub blob that represents the directory if (isBlob && !strings.HasSuffix(blobURLParts.BlobName, common.AZCOPY_PATH_SEPARATOR_STRING)) || (t.includeDirectoryStubs && isDirStub && t.recursive) { // sanity checking so highlighting doesn't highlight things we're not worried about. if blobProperties == nil { panic("isBlob should never be set if getting properties is an error") } if azcopyScanningLogger != nil { azcopyScanningLogger.Log(common.LogDebug, "Detected the root as a blob.") azcopyScanningLogger.Log(common.LogDebug, fmt.Sprintf("Root entity type: %s", getEntityType(blobProperties.Metadata))) } relPath := "" if strings.HasSuffix(blobName, "/") { relPath = "\x00" // Because the ste will trim the / suffix from our source, or we may not already have it. } blobPropsAdapter := blobPropertiesResponseAdapter{blobProperties} storedObject := newStoredObject( preprocessor, getObjectNameOnly(blobName), relPath, getEntityType(blobPropsAdapter.Metadata), blobPropsAdapter.LastModified(), blobPropsAdapter.ContentLength(), blobPropsAdapter, blobPropsAdapter, blobPropsAdapter.Metadata, blobURLParts.ContainerName, ) if t.s2sPreserveSourceTags { blobTagsMap, err := t.getBlobTags() if err != nil { panic("Couldn't fetch blob tags due to error: " + err.Error()) } if len(blobTagsMap) > 0 { storedObject.blobTags = blobTagsMap } } if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(storedObject.entityType) } err := processIfPassedFilters(filters, storedObject, processor) _, err = getProcessingError(err) // short-circuit if we don't have anything else to scan and permanent delete is not on if !t.includeDeleted && (isBlob || err != nil) { return err } } else if blobURLParts.BlobName == "" && (t.preservePermissions.IsTruthy() || t.isDFS) { // If the root is a container and we're copying "folders", we should persist the ACLs there too. // For DFS, we should always include the container root. if azcopyScanningLogger != nil { azcopyScanningLogger.Log(common.LogDebug, "Detected the root as a container.") } storedObject := newStoredObject( preprocessor, "", "", common.EEntityType.Folder(), time.Now(), 0, noContentProps, noBlobProps, common.Metadata{}, blobURLParts.ContainerName, ) if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(common.EEntityType.Folder()) } err := processIfPassedFilters(filters, storedObject, processor) _, err = getProcessingError(err) if err != nil { return err } } // get the container URL so that we can list the blobs containerClient := t.serviceClient.NewContainerClient(blobURLParts.ContainerName) // get the search prefix to aid in the listing // example: for a url like https://test.blob.core.windows.net/test/foo/bar/bla // the search prefix would be foo/bar/bla searchPrefix := blobURLParts.BlobName // append a slash if it is not already present // example: foo/bar/bla becomes foo/bar/bla/ so that we only list children of the virtual directory if searchPrefix != "" && !strings.HasSuffix(searchPrefix, common.AZCOPY_PATH_SEPARATOR_STRING) && !t.includeSnapshot && !t.includeDeleted { searchPrefix += common.AZCOPY_PATH_SEPARATOR_STRING } // as a performance optimization, get an extra prefix to do pre-filtering. It's typically the start portion of a blob name. extraSearchPrefix := FilterSet(filters).GetEnumerationPreFilter(t.recursive) if t.parallelListing { return t.parallelList(containerClient, blobURLParts.ContainerName, searchPrefix, extraSearchPrefix, preprocessor, processor, filters) } return t.serialList(containerClient, blobURLParts.ContainerName, searchPrefix, extraSearchPrefix, preprocessor, processor, filters) } func (t *blobTraverser) parallelList(containerClient *container.Client, containerName string, searchPrefix string, extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error { // Define how to enumerate its contents // This func must be thread safe/goroutine safe enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error { currentDirPath := dir.(string) pager := containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ Prefix: &currentDirPath, Include: container.ListBlobsInclude{Metadata: true, Tags: t.s2sPreserveSourceTags, Deleted: t.includeDeleted, Snapshots: t.includeSnapshot, Versions: t.includeVersion}, }) var marker *string for pager.More() { lResp, err := pager.NextPage(t.ctx) if err != nil { return fmt.Errorf("cannot list files due to reason %s", err) } // queue up the sub virtual directories if recursive is true if t.recursive { for _, virtualDir := range lResp.Segment.BlobPrefixes { enqueueDir(*virtualDir.Name) if azcopyScanningLogger != nil { azcopyScanningLogger.Log(common.LogDebug, fmt.Sprintf("Enqueuing sub-directory %s for enumeration.", *virtualDir.Name)) } if t.includeDirectoryStubs { // try to get properties on the directory itself, since it's not listed in BlobItems dName := strings.TrimSuffix(*virtualDir.Name, common.AZCOPY_PATH_SEPARATOR_STRING) blobClient := containerClient.NewBlobClient(dName) altNameCheck: pResp, err := blobClient.GetProperties(t.ctx, nil) if err == nil { if !t.doesBlobRepresentAFolder(pResp.Metadata) { // We've picked up on a file *named* the folder, not the folder itself. Does folder/ exist? if !strings.HasSuffix(dName, "/") { blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check. dName += common.AZCOPY_PATH_SEPARATOR_STRING goto altNameCheck // "foo" is a file, what about "foo/"? } goto skipDirAdd // We shouldn't add a blob that isn't a folder as a folder. You either have the folder metadata, or you don't. } pbPropAdapter := blobPropertiesResponseAdapter{&pResp} folderRelativePath := strings.TrimPrefix(dName, searchPrefix) storedObject := newStoredObject( preprocessor, getObjectNameOnly(dName), folderRelativePath, common.EEntityType.Folder(), pbPropAdapter.LastModified(), pbPropAdapter.ContentLength(), pbPropAdapter, pbPropAdapter, pbPropAdapter.Metadata, containerName, ) if t.s2sPreserveSourceTags { tResp, err := blobClient.GetTags(t.ctx, nil) if err == nil { blobTagsMap := common.BlobTags{} for _, blobTag := range tResp.BlobTagSet { blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value) } storedObject.blobTags = blobTagsMap } } enqueueOutput(storedObject, err) } else { // There was nothing there, but is there folder/? if !strings.HasSuffix(dName, "/") { blobClient = containerClient.NewBlobClient(dName + common.AZCOPY_PATH_SEPARATOR_STRING) // Tack on the path separator, check. dName += common.AZCOPY_PATH_SEPARATOR_STRING goto altNameCheck // "foo" is a file, what about "foo/"? } } skipDirAdd: } } } // process the blobs returned in this result segment for _, blobInfo := range lResp.Segment.BlobItems { // if the blob represents a hdi folder, then skip it if t.doesBlobRepresentAFolder(blobInfo.Metadata) { continue } storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, strings.TrimPrefix(*blobInfo.Name, searchPrefix), containerName) // edge case, blob name happens to be the same as root and ends in / if storedObject.relativePath == "" && strings.HasSuffix(storedObject.name, "/") { storedObject.relativePath = "\x00" // Short circuit, letting the backend know we *really* meant root/. } if t.s2sPreserveSourceTags && blobInfo.BlobTags != nil { blobTagsMap := common.BlobTags{} for _, blobTag := range blobInfo.BlobTags.BlobTagSet { blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value) } storedObject.blobTags = blobTagsMap } enqueueOutput(storedObject, nil) } // if debug mode is on, note down the result, this is not going to be fast if azcopyScanningLogger != nil && azcopyScanningLogger.ShouldLog(common.LogDebug) { tokenValue := "NONE" if marker != nil { tokenValue = *marker } var vdirListBuilder strings.Builder for _, virtualDir := range lResp.Segment.BlobPrefixes { fmt.Fprintf(&vdirListBuilder, " %s,", *virtualDir.Name) } var fileListBuilder strings.Builder for _, blobInfo := range lResp.Segment.BlobItems { fmt.Fprintf(&fileListBuilder, " %s,", *blobInfo.Name) } msg := fmt.Sprintf("Enumerating %s with token %s. Sub-dirs:%s Files:%s", currentDirPath, tokenValue, vdirListBuilder.String(), fileListBuilder.String()) azcopyScanningLogger.Log(common.LogDebug, msg) } marker = lResp.NextMarker } return nil } // initiate parallel scanning, starting at the root path workerContext, cancelWorkers := context.WithCancel(t.ctx) defer cancelWorkers() cCrawled := parallel.Crawl(workerContext, searchPrefix+extraSearchPrefix, enumerateOneDir, EnumerationParallelism) for x := range cCrawled { item, workerError := x.Item() if workerError != nil { return workerError } if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(common.EEntityType.File()) } object := item.(StoredObject) processErr := processIfPassedFilters(filters, object, processor) _, processErr = getProcessingError(processErr) if processErr != nil { return processErr } } return nil } func getEntityType(metadata map[string]*string) common.EntityType { // Note: We are just checking keys here, not their corresponding values. Is that safe? if folderValue, isFolder := common.TryReadMetadata(metadata, common.POSIXFolderMeta); isFolder && folderValue != nil && strings.ToLower(*folderValue) == "true" { return common.EEntityType.Folder() } else if symlinkValue, isSymlink := common.TryReadMetadata(metadata, common.POSIXSymlinkMeta); isSymlink && symlinkValue != nil && strings.ToLower(*symlinkValue) == "true" { return common.EEntityType.Symlink() } return common.EEntityType.File() } func (t *blobTraverser) createStoredObjectForBlob(preprocessor objectMorpher, blobInfo *container.BlobItem, relativePath string, containerName string) StoredObject { adapter := blobPropertiesAdapter{blobInfo.Properties} if azcopyScanningLogger != nil { azcopyScanningLogger.Log(common.LogDebug, fmt.Sprintf("Blob %s entity type: %s", relativePath, getEntityType(blobInfo.Metadata))) } object := newStoredObject( preprocessor, getObjectNameOnly(*blobInfo.Name), relativePath, getEntityType(blobInfo.Metadata), adapter.LastModified(), *adapter.BlobProperties.ContentLength, adapter, adapter, // adapter satisfies both interfaces blobInfo.Metadata, containerName, ) object.blobDeleted = common.IffNotNil(blobInfo.Deleted, false) if t.includeDeleted && t.includeSnapshot { object.blobSnapshotID = common.IffNotNil(blobInfo.Snapshot, "") } else if t.includeVersion && blobInfo.VersionID != nil { object.blobVersionID = common.IffNotNil(blobInfo.VersionID, "") } return object } func (t *blobTraverser) doesBlobRepresentAFolder(metadata map[string]*string) bool { util := copyHandlerUtil{} return util.doesBlobRepresentAFolder(metadata) // We should ignore these, because we pick them up in other ways. } func (t *blobTraverser) serialList(containerClient *container.Client, containerName string, searchPrefix string, extraSearchPrefix string, preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error { // see the TO DO in GetEnumerationPreFilter if/when we make this more directory-aware // TODO optimize for the case where recursive is off prefix := searchPrefix + extraSearchPrefix pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ Prefix: &prefix, Include: container.ListBlobsInclude{Metadata: true, Tags: t.s2sPreserveSourceTags, Deleted: t.includeDeleted, Snapshots: t.includeSnapshot, Versions: t.includeVersion}, }) for pager.More() { resp, err := pager.NextPage(t.ctx) if err != nil { return fmt.Errorf("cannot list blobs. Failed with error %s", err.Error()) } // process the blobs returned in this result segment for _, blobInfo := range resp.Segment.BlobItems { // if the blob represents a hdi folder, then skip it if t.doesBlobRepresentAFolder(blobInfo.Metadata) { continue } relativePath := strings.TrimPrefix(*blobInfo.Name, searchPrefix) // if recursive if !t.recursive && strings.Contains(relativePath, common.AZCOPY_PATH_SEPARATOR_STRING) { continue } storedObject := t.createStoredObjectForBlob(preprocessor, blobInfo, relativePath, containerName) // edge case, blob name happens to be the same as root and ends in / if storedObject.relativePath == "" && strings.HasSuffix(storedObject.name, "/") { storedObject.relativePath = "\x00" // Short circuit, letting the backend know we *really* meant root/. } // Setting blob tags if t.s2sPreserveSourceTags && blobInfo.BlobTags != nil { blobTagsMap := common.BlobTags{} for _, blobTag := range blobInfo.BlobTags.BlobTagSet { blobTagsMap[url.QueryEscape(*blobTag.Key)] = url.QueryEscape(*blobTag.Value) } storedObject.blobTags = blobTagsMap } if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(common.EEntityType.File()) } processErr := processIfPassedFilters(filters, storedObject, processor) _, processErr = getProcessingError(processErr) if processErr != nil { return processErr } } } return nil } func newBlobTraverser(rawURL string, serviceClient *service.Client, ctx context.Context, recursive, includeDirectoryStubs bool, incrementEnumerationCounter enumerationCounterFunc, s2sPreserveSourceTags bool, cpkOptions common.CpkOptions, includeDeleted, includeSnapshot, includeVersion bool, preservePermissions common.PreservePermissionsOption, isDFS bool) (t *blobTraverser) { t = &blobTraverser{ rawURL: rawURL, serviceClient: serviceClient, ctx: ctx, recursive: recursive, includeDirectoryStubs: includeDirectoryStubs, incrementEnumerationCounter: incrementEnumerationCounter, parallelListing: true, s2sPreserveSourceTags: s2sPreserveSourceTags, cpkOptions: cpkOptions, includeDeleted: includeDeleted, includeSnapshot: includeSnapshot, includeVersion: includeVersion, preservePermissions: preservePermissions, isDFS: isDFS, } disableHierarchicalScanning := strings.ToLower(common.GetEnvironmentVariable(common.EEnvironmentVariable.DisableHierarchicalScanning())) // disableHierarchicalScanning should be true for permanent delete if (disableHierarchicalScanning == "false" || disableHierarchicalScanning == "") && includeDeleted && (includeSnapshot || includeVersion) { t.parallelListing = false fmt.Println("AZCOPY_DISABLE_HIERARCHICAL_SCAN has been set to true to permanently delete soft-deleted snapshots/versions.") } if disableHierarchicalScanning == "true" { // TODO log to frontend log that parallel listing was disabled, once the frontend log PR is merged t.parallelListing = false } return } func createBlobClientFromServiceClient(blobURLParts blob.URLParts, client *service.Client) (*blob.Client, error) { containerClient := client.NewContainerClient(blobURLParts.ContainerName) blobClient := containerClient.NewBlobClient(blobURLParts.BlobName) if blobURLParts.Snapshot != "" { return blobClient.WithSnapshot(blobURLParts.Snapshot) } if blobURLParts.VersionID != "" { return blobClient.WithVersionID(blobURLParts.VersionID) } return blobClient, nil }