filters/strings.py (159 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib
import json
from datetime import datetime, timezone
from time import mktime
import parsedatetime
from autolink import linkify
from google.cloud import storage
import csv
import io
from tablepyxl import tablepyxl
import base64
import magic
import re
import logging
import hashlib
import yaml
import json_fix # noqa: F401
from pathlib import Path
import parse
import os
def make_list(s):
if not isinstance(s, list):
return [s]
return s
def add_links(s):
return linkify(s)
def urlencode(s):
return urllib.parse.quote(s)
def re_escape(s):
return re.escape(s)
def json_encode(v):
try:
return json.dumps(v)
except Exception as exc:
logger = logging.getLogger('pubsub2inbox')
logger.error('Exception when trying to encode JSON!',
extra={
'value': v,
'error': str(exc)
})
raise exc
def yaml_encode(v):
try:
return yaml.dump(v)
except Exception as exc:
logger = logging.getLogger('pubsub2inbox')
logger.error('Exception when trying to encode YAML!',
extra={
'value': v,
'error': str(exc)
})
raise exc
def json_decode(v):
return json.loads(v)
def yaml_decode(v):
return yaml.load(v, Loader=yaml.SafeLoader)
def b64decode(v):
return base64.b64decode(v.encode()).decode()
def csv_encode(v, **kwargs):
output = io.StringIO()
csvwriter = csv.writer(output, **kwargs)
csvwriter.writerow(v)
return output.getvalue()
def html_table_to_xlsx(s):
if s.strip() == '':
return ''
workbook = tablepyxl.document_to_workbook(s)
output = io.BytesIO()
workbook.save(output)
return base64.encodebytes(output.getvalue()).decode('utf-8')
def read_gcs_object(url, start=None, end=None):
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme != 'gs':
raise InvalidSchemeURLException(
'Invalid scheme for read_gcs_object(%s): %s' %
(url, parsed_url.scheme))
client = storage.Client()
bucket = client.bucket(parsed_url.netloc)
blob = bucket.get_blob(parsed_url.path[1:])
if not blob:
raise ObjectNotFoundException(
'Failed to download object %s from bucket %s' %
(parsed_url.netloc, parsed_url.path[1:]))
contents = blob.download_as_bytes(start=start, end=end)
return base64.encodebytes(contents).decode('utf-8')
def read_file(filename):
return Path(filename).read_text()
def read_file_b64(filename):
return base64.b64encode(Path(filename).read_bytes())
def filemagic(contents):
with magic.Magic(flags=magic.MAGIC_MIME_TYPE) as m:
return m.id_buffer(base64.b64decode(contents))
class ObjectNotFoundException(Exception):
pass
class InvalidSchemeURLException(Exception):
pass
class InvalidSchemeSignedURLException(Exception):
pass
def generate_signed_url(url, expiration, **kwargs):
"""Returns a signed URL to a GCS object. URL should be in format "gs://bucket/file"."""
expiration_parsed = parsedatetime.Calendar(
version=parsedatetime.VERSION_CONTEXT_STYLE).parse(expiration)
if len(expiration_parsed) > 1:
expiration = datetime.fromtimestamp(mktime(expiration_parsed[0]),
timezone.utc)
else:
expiration = datetime.fromtimestamp(mktime(expiration_parsed),
timezone.utc)
parsed_url = urllib.parse.urlparse(url)
if parsed_url.scheme != 'gs':
raise InvalidSchemeSignedURLException(
'Invalid scheme for generate_signed_url: %s' % parsed_url.scheme)
client = storage.Client()
bucket = client.bucket(parsed_url.netloc)
blob = bucket.get_blob(parsed_url.path[1:])
signed_url = blob.generate_signed_url(expiration=expiration,
version='v4',
**kwargs)
return signed_url
def hash_string(v, hash_type='md5'):
h = hashlib.new(hash_type)
h.update(v.encode('utf-8'))
return h.hexdigest()
def parse_string(v, spec):
r = parse.parse(spec, v)
if r is not None:
return r.named
return None
def parse_url(v):
parsed_url = urllib.parse.urlparse(v)
res = {}
for k in dir(parsed_url):
if not k.startswith('_'):
res[k] = getattr(parsed_url, k)
res['name'] = os.path.basename(res['path'])
res['prefix'] = os.path.dirname(res['path'])
return res
def trim(v):
return v.strip()
def ltrim(v):
return v.lstrip()
def rtrim(v):
return v.rstrip()
def remove_mrkdwn(v, links=False, italic=True):
if isinstance(v, list):
removed = []
for s in v:
removed.append(remove_mrkdwn(s, links, italic))
return removed
if not isinstance(v, str):
return v
if links:
v = re.sub(r'<([^\|]*)\|[^\|]*>', r'\1', v)
else:
v = re.sub(r'<[^\|]*\|([^\|]*)>', r'\1', v)
# bolding
v = re.sub(r'\*([\w\s!\?\(\)\{\}\.,:;\+=&]+)\*', r'\1', v)
# strikethrough
v = re.sub(r'~([\w\s!\?\(\)\{\}\.,:;\+=&]+)~', r'\1', v)
# italic (can be a bit problematic, so disabled by default)
if italic:
v = re.sub(r'\_([a-zA-Z0-9\s!\?\(\)\{\}\.,:;\+=&]+?)\_', r'\1', v)
return v