o2a/utils/el_utils.py (89 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities used by EL functions"""
import codecs
import logging
import os
import re
from copy import deepcopy
from typing import List, Optional, Tuple, Union, Dict
from urllib.parse import urlparse
from jinja2 import StrictUndefined, Environment
from jinja2.exceptions import UndefinedError
from o2a.converter.exceptions import ParseException
from o2a.o2a_libs.src.o2a_lib import el_parser
from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet
def strip_el(el_function: str) -> str:
"""
Given an el function or variable like ${ variable },
strips out everything except for the variable.
"""
return re.sub("[${}]", "", el_function).strip()
def extract_evaluate_properties(properties_file: Optional[str], props: PropertySet):
"""
Parses the job_properties file into a dictionary, if the value has
and EL function in it, it gets replaced with the corresponding
value that has already been parsed. For example, a file like:
job.job_properties
host=user@google.com
command=ssh ${host}
The job_properties would be parsed like:
PROPERTIES = {
host: 'user@google.com',
command='ssh user@google.com',
}
"""
copy_of_props = deepcopy(props)
properties_read_from_file: Dict[str, str] = {}
if not properties_file:
return properties_read_from_file
if not os.path.isfile(properties_file):
logging.warning(f"The job_properties file is missing: {properties_file}")
return properties_read_from_file
with open(properties_file) as prop_file:
for line in prop_file.readlines():
if line.startswith("#") or line.startswith(" ") or line.startswith("\n"):
continue
key, value = _evaluate_properties_line(
line, known_values=properties_read_from_file, props=copy_of_props
)
# Set the value of property in the copy of property set for further reference
copy_of_props.action_node_properties[key] = value
properties_read_from_file[key] = value
return properties_read_from_file
def _evaluate_properties_line(line: str, known_values: dict, props: PropertySet) -> Tuple[str, str]:
"""
Evaluates single line from properties file using already known values from the file and
values from passed property set.
"""
key, value = line.split("=", 1)
translation = el_parser.translate(value)
tmp = deepcopy(known_values)
tmp.update(props.merged)
env = Environment(undefined=StrictUndefined)
try:
translation = env.from_string(translation).render(**tmp)
except UndefinedError:
translation = value
return key.strip(), translation.strip()
def comma_separated_string_to_list(line: str) -> Union[List[str], str]:
"""
Converts a comma-separated string to a List of strings.
If the input is a single item (no comma), it will be returned unchanged.
"""
values = line.split(",")
return values[0] if len(values) <= 1 else values
def _resolve_name_node(translation: str, props: PropertySet) -> Tuple[Optional[str], int]:
"""
Check if props include nameNode, nameNode1 or nameNode2 value.
"""
merged = props.merged
for key in ["nameNode", "nameNode1", "nameNode2"]:
start_str = "{{" + key + "}}"
name_node = merged.get(key)
if translation.startswith(start_str) and name_node:
return name_node, len(start_str)
return None, 0
def normalize_path(url: str, props: PropertySet, allow_no_schema=False, translated=False) -> str:
"""
Transforms url by replacing EL-expression with equivalent jinja templates
and returns only the path part of the url. If schema validation is
required then props should include proper name-node. If translated is set to True
then passed url is supposed to be a valid jinja expression.
For example:
input: '{$nameNode}/users/{$userName}/dir
url_with_var: `{{nameNode}}/users/{{userName}}/dir
In this case to validate url schema props should contain `nameNode` value.
"""
url_with_var = url if translated else el_parser.translate(url)
name_node, shift = _resolve_name_node(url_with_var, props)
if name_node:
url_parts = urlparse(name_node)
output = url_with_var[shift:]
else:
url_parts = urlparse(url_with_var)
output = url_parts.path
allowed_schemas = {"hdfs", ""} if allow_no_schema else {"hdfs"}
if url_parts.scheme not in allowed_schemas:
raise ParseException(
f"Unknown path format. The URL should be provided in the following format: "
f"hdfs://localhost:9200/path. Current value: {url_with_var}"
)
return output
def replace_url_el(url: str, props: PropertySet, allow_no_schema=False) -> str:
"""
Transforms url by replacing EL-expression with equivalent jinja templates.
If schema validation is required then props should include proper name-node.
For example:
input: '{$nameNode}/users/{$userName}/dir
url_with_var: `{{nameNode}}/users/{{userName}}/dir
In this case to validate url schema props should contain `nameNode` value.
"""
url_with_var = el_parser.translate(url)
name_node, _ = _resolve_name_node(url_with_var, props)
if name_node:
url_parts = urlparse(name_node)
else:
url_parts = urlparse(url_with_var)
allowed_schemas = {"hdfs", ""} if allow_no_schema else {"hdfs"}
if url_parts.scheme not in allowed_schemas:
raise ParseException(
f"Unknown path format. The URL should be provided in the following format: "
f"hdfs://localhost:9200/path. Current value: {url_with_var}"
)
return url_with_var
def escape_string_with_python_escapes(string_to_escape: Optional[str]) -> Optional[str]:
if not string_to_escape:
return None
escaped_bytes, _ = codecs.escape_encode(string_to_escape.encode()) # type: ignore # C-Api level
return "'" + escaped_bytes.decode("utf-8") + "'" # type: ignore