o2a/utils/el_utils.py (89 lines of code) (raw):

# -*- coding: utf-8 -*- # Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Utilities used by EL functions""" import codecs import logging import os import re from copy import deepcopy from typing import List, Optional, Tuple, Union, Dict from urllib.parse import urlparse from jinja2 import StrictUndefined, Environment from jinja2.exceptions import UndefinedError from o2a.converter.exceptions import ParseException from o2a.o2a_libs.src.o2a_lib import el_parser from o2a.o2a_libs.src.o2a_lib.property_utils import PropertySet def strip_el(el_function: str) -> str: """ Given an el function or variable like ${ variable }, strips out everything except for the variable. """ return re.sub("[${}]", "", el_function).strip() def extract_evaluate_properties(properties_file: Optional[str], props: PropertySet): """ Parses the job_properties file into a dictionary, if the value has and EL function in it, it gets replaced with the corresponding value that has already been parsed. For example, a file like: job.job_properties host=user@google.com command=ssh ${host} The job_properties would be parsed like: PROPERTIES = { host: 'user@google.com', command='ssh user@google.com', } """ copy_of_props = deepcopy(props) properties_read_from_file: Dict[str, str] = {} if not properties_file: return properties_read_from_file if not os.path.isfile(properties_file): logging.warning(f"The job_properties file is missing: {properties_file}") return properties_read_from_file with open(properties_file) as prop_file: for line in prop_file.readlines(): if line.startswith("#") or line.startswith(" ") or line.startswith("\n"): continue key, value = _evaluate_properties_line( line, known_values=properties_read_from_file, props=copy_of_props ) # Set the value of property in the copy of property set for further reference copy_of_props.action_node_properties[key] = value properties_read_from_file[key] = value return properties_read_from_file def _evaluate_properties_line(line: str, known_values: dict, props: PropertySet) -> Tuple[str, str]: """ Evaluates single line from properties file using already known values from the file and values from passed property set. """ key, value = line.split("=", 1) translation = el_parser.translate(value) tmp = deepcopy(known_values) tmp.update(props.merged) env = Environment(undefined=StrictUndefined) try: translation = env.from_string(translation).render(**tmp) except UndefinedError: translation = value return key.strip(), translation.strip() def comma_separated_string_to_list(line: str) -> Union[List[str], str]: """ Converts a comma-separated string to a List of strings. If the input is a single item (no comma), it will be returned unchanged. """ values = line.split(",") return values[0] if len(values) <= 1 else values def _resolve_name_node(translation: str, props: PropertySet) -> Tuple[Optional[str], int]: """ Check if props include nameNode, nameNode1 or nameNode2 value. """ merged = props.merged for key in ["nameNode", "nameNode1", "nameNode2"]: start_str = "{{" + key + "}}" name_node = merged.get(key) if translation.startswith(start_str) and name_node: return name_node, len(start_str) return None, 0 def normalize_path(url: str, props: PropertySet, allow_no_schema=False, translated=False) -> str: """ Transforms url by replacing EL-expression with equivalent jinja templates and returns only the path part of the url. If schema validation is required then props should include proper name-node. If translated is set to True then passed url is supposed to be a valid jinja expression. For example: input: '{$nameNode}/users/{$userName}/dir url_with_var: `{{nameNode}}/users/{{userName}}/dir In this case to validate url schema props should contain `nameNode` value. """ url_with_var = url if translated else el_parser.translate(url) name_node, shift = _resolve_name_node(url_with_var, props) if name_node: url_parts = urlparse(name_node) output = url_with_var[shift:] else: url_parts = urlparse(url_with_var) output = url_parts.path allowed_schemas = {"hdfs", ""} if allow_no_schema else {"hdfs"} if url_parts.scheme not in allowed_schemas: raise ParseException( f"Unknown path format. The URL should be provided in the following format: " f"hdfs://localhost:9200/path. Current value: {url_with_var}" ) return output def replace_url_el(url: str, props: PropertySet, allow_no_schema=False) -> str: """ Transforms url by replacing EL-expression with equivalent jinja templates. If schema validation is required then props should include proper name-node. For example: input: '{$nameNode}/users/{$userName}/dir url_with_var: `{{nameNode}}/users/{{userName}}/dir In this case to validate url schema props should contain `nameNode` value. """ url_with_var = el_parser.translate(url) name_node, _ = _resolve_name_node(url_with_var, props) if name_node: url_parts = urlparse(name_node) else: url_parts = urlparse(url_with_var) allowed_schemas = {"hdfs", ""} if allow_no_schema else {"hdfs"} if url_parts.scheme not in allowed_schemas: raise ParseException( f"Unknown path format. The URL should be provided in the following format: " f"hdfs://localhost:9200/path. Current value: {url_with_var}" ) return url_with_var def escape_string_with_python_escapes(string_to_escape: Optional[str]) -> Optional[str]: if not string_to_escape: return None escaped_bytes, _ = codecs.escape_encode(string_to_escape.encode()) # type: ignore # C-Api level return "'" + escaped_bytes.decode("utf-8") + "'" # type: ignore