petastorm/gcsfs_helpers/gcsfs_wrapper.py (49 lines of code) (raw):
# Copyright (c) 2017-2020 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import posixpath
from pyarrow.filesystem import FileSystem, DaskFileSystem
from pyarrow.util import implements, _stringify_path
class GCSFSWrapper(DaskFileSystem):
@implements(FileSystem.isdir)
def isdir(self, path):
from gcsfs.core import norm_path
path = norm_path(_stringify_path(path))
try:
contents = self.fs.ls(path)
if len(contents) == 1 and contents[0] == path:
return False
else:
return True
except OSError:
return False
@implements(FileSystem.isfile)
def isfile(self, path):
from gcsfs.core import norm_path
path = norm_path(_stringify_path(path))
try:
contents = self.fs.ls(path)
return len(contents) == 1 and contents[0] == path
except OSError:
return False
def walk(self, path):
"""
Directory tree generator, like os.walk
Generator version of what is in gcsfs, which yields a flattened list of
files
"""
from gcsfs.core import norm_path
path = norm_path(_stringify_path(path))
directories = set()
files = set()
for key in self.fs.ls(path, detail=True):
# each info name must be at least [path]/part , but here
# we check also for names like [path]/part/
path = key['name']
if key['storageClass'] == 'DIRECTORY':
if path.endswith('/'):
directories.add(path[:-1])
else:
directories.add(path)
elif key['storageClass'] == 'BUCKET':
pass
else:
files.add(path)
files = sorted([posixpath.split(f)[1] for f in files
if f not in directories])
directories = sorted([posixpath.split(x)[1]
for x in directories])
yield path, directories, files
for directory in directories:
for tup in self.walk(directory):
yield tup