python-package/lets_plot/plot/util.py (180 lines of code) (raw):

# # Copyright (c) 2019. JetBrains s.r.o. # Use of this source code is governed by the MIT license that can be found in the LICENSE file. # from typing import Any, Tuple, Sequence, Optional, Dict, List from lets_plot._type_utils import is_pandas_data_frame, is_polars_dataframe from lets_plot.geo_data_internals.utils import find_geo_names from lets_plot.mapping import MappingMeta from lets_plot.plot.core import aes, FeatureSpec, PlotSpec from lets_plot.plot.series_meta import _infer_type, TYPE_UNKNOWN, TYPE_DATE_TIME, _detect_time_zone def as_boolean(val, *, default): if val is None: return default return bool(val) and val != 'False' def update_plot_aes_mapping(plot: PlotSpec, add_mapping: FeatureSpec): existing_spec = plot.props().get('mapping', aes()) merged_mapping = {**existing_spec.as_dict(), **add_mapping.as_dict()} # Re-annotate the data with the merged mapping. data = plot.props().get('data', None) data, processed_mapping, data_meta = as_annotated_data(data, aes(**merged_mapping)) plot.props()['data'] = data plot.props()['mapping'] = processed_mapping # Add data_meta to plot properties for key, value in data_meta.items(): plot.props()[key] = value def as_annotated_data(data: Any, mapping_spec: FeatureSpec) -> Tuple: data_type_by_var: Dict[str, str] = {} # VarName to Type mapping_meta_by_var: Dict[str, Dict[str, MappingMeta]] = {} # VarName to Dict[Aes, MappingMeta] mappings = {} # Aes to VarName # fill mapping_meta_by_var, mappings and data_type_by_var. if mapping_spec is not None: for key, spec in mapping_spec.props().items(): # the key is either an aesthetic name or 'name' (FeatureSpec.name property) if key == 'name': # ignore FeatureSpec.name property continue if isinstance(spec, MappingMeta): mappings[key] = spec.variable mapping_meta_by_var.setdefault(spec.variable, {})[key] = spec data_type_by_var[spec.variable] = TYPE_UNKNOWN else: mappings[key] = spec # spec is a variable name data_type_by_var.update(_infer_type(data)) # Detect the tome zone - one for the entire data set. time_zone_by_var_name = {} for var_name, data_type in data_type_by_var.items(): if data_type == TYPE_DATE_TIME: time_zone = _detect_time_zone(var_name, data) if time_zone is not None: time_zone_by_var_name[var_name] = time_zone # fill series annotations series_annotations = {} # var to series_annotation for var_name, data_type in data_type_by_var.items(): series_annotation = {} if data_type != TYPE_UNKNOWN: series_annotation['type'] = data_type if var_name in time_zone_by_var_name: series_annotation['time_zone'] = time_zone_by_var_name[var_name] if is_pandas_data_frame(data) and data[var_name].dtype.name == 'category' and data[var_name].dtype.ordered: series_annotation['factor_levels'] = data[var_name].cat.categories.to_list() elif is_polars_dataframe(data): import polars col_dtype = data[var_name].dtype if isinstance(col_dtype, polars.datatypes.Enum): series_annotation['factor_levels'] = list(col_dtype.categories) elif isinstance(col_dtype, polars.datatypes.Categorical): # # It does not seem possible to get categories in correct order from the Categorical dtype. # categories_series = data[var_name].cat.get_categories() # indises = [col_dtype.categories[cat] for cat in categories_series] # cats = [col_dtype.categories[i] for i in indises] # series_annotation['factor_levels'] = categories_series.to_list() pass elif var_name in mapping_meta_by_var: levels = last_not_none(list(map(lambda mm: mm.levels, mapping_meta_by_var[var_name].values()))) if levels is not None: series_annotation['factor_levels'] = levels if 'factor_levels' in series_annotation and var_name in mapping_meta_by_var: order = last_not_none(list(map(lambda mm: mm.parameters['order'], mapping_meta_by_var[var_name].values()))) if order is not None: series_annotation['order'] = order if len(series_annotation) > 0: series_annotation['column'] = var_name series_annotations[var_name] = series_annotation # fill mapping annotations mapping_annotations = [] for var_name, meta_data in mapping_meta_by_var.items(): for aesthetic, mapping_meta in meta_data.items(): if mapping_meta.annotation == 'as_discrete': if 'factor_levels' in series_annotations.get(var_name, {}): # there is a bug - if label is set then levels are not applied continue mapping_annotation = {} # Note that the label is always set; otherwise, the scale title will appear as 'color.cyl' label = mapping_meta.parameters.get('label') if label is not None: mapping_annotation.setdefault('parameters', {})['label'] = label if mapping_meta.levels is not None: mapping_annotation['levels'] = mapping_meta.levels order_by = mapping_meta.parameters.get('order_by') if order_by is not None: mapping_annotation.setdefault('parameters', {})['order_by'] = order_by order = mapping_meta.parameters.get('order') if order is not None: mapping_annotation.setdefault('parameters', {})['order'] = order # add mapping meta if a custom label is set or if series annotation for var doesn't contain order options # otherwise don't add mapping meta - it's redundant, nothing unique compared to series annotation if len(mapping_annotation): mapping_annotation['aes'] = aesthetic mapping_annotation['annotation'] = 'as_discrete' mapping_annotations.append(mapping_annotation) data_meta = {} if len(series_annotations) > 0: data_meta.update({'series_annotations': list(series_annotations.values())}) if len(mapping_annotations) > 0: data_meta.update({'mapping_annotations': mapping_annotations}) return data, aes(**mappings), {'data_meta': data_meta} def is_data_pub_stream(data: Any) -> bool: # try: # from lets_plot.display import DataPubStream # return isinstance(data, DataPubStream) # except ImportError: # return False # no pub-sub in standalone deployment return False def normalize_map_join(map_join): if map_join is None: return None def invalid_map_join_format(): return ValueError("map_join must be a str, list[str] or pair of list[str]") if isinstance(map_join, str): # 'foo' -> [['foo'], None] data_names = [map_join] map_names = None elif isinstance(map_join, Sequence): if all(isinstance(v, str) for v in map_join): # all items are strings if len(map_join) == 1: # ['foo'] -> [['foo'], None] data_names = map_join map_names = None elif len(map_join) == 2: # ['foo', 'bar'] -> [['foo'], ['bar']] data_names = [map_join[0]] map_names = [map_join[1]] elif len(map_join) > 2: # ['foo', 'bar', 'baz'] -> error raise ValueError( "map_join of type list[str] expected to have 1 or 2 items, but was {}".format(len(map_join))) else: raise invalid_map_join_format() elif all(isinstance(v, Sequence) and not isinstance(v, str) for v in map_join): # all items are lists if len(map_join) == 1: # [['foo', 'bar']] -> [['foo', 'bar'], None] data_names = map_join[0] map_names = None elif len(map_join) == 2: # [['foo', 'bar'], ['baz', 'qux']] -> [['foo', 'bar'], ['baz', 'qux']] data_names = map_join[0] map_names = map_join[1] else: raise invalid_map_join_format() else: raise invalid_map_join_format() else: raise invalid_map_join_format() return [data_names, map_names] def auto_join_geo_names(map_join: Any, gdf): if map_join is None: return None data_names = map_join[0] map_names = map_join[1] if map_names is None: map_names = find_geo_names(gdf) if len(map_names) == 0: raise ValueError( "Can't deduce joining keys.\n" "Define both data and map key columns in map_join " "explicitly: map_join=[['data_column'], ['map_column']]." ) if len(data_names) > len(map_names): raise ValueError( "Data key columns count exceeds map key columns count: {} > {}".format(len(data_names), len(map_names)) ) map_names = map_names[:len(data_names)] # use same number of key columns return [data_names, map_names] def is_geo_data_frame(data: Any) -> bool: try: from geopandas import GeoDataFrame return isinstance(data, GeoDataFrame) except ImportError: return False def get_geo_data_frame_meta(geo_data_frame) -> dict: return { 'geodataframe': { 'geometry': geo_data_frame.geometry.name } } def geo_data_frame_to_crs(gdf: 'GeoDataFrame', use_crs: Optional[str]): if gdf.crs is None: return gdf return gdf.to_crs('EPSG:4326' if use_crs is None else use_crs) def key_int2str(data): if is_pandas_data_frame(data): if data.columns.inferred_type == 'integer' or data.columns.inferred_type == 'mixed-integer': data.columns = data.columns.astype(str) return data if isinstance(data, dict): return {(str(k) if isinstance(k, int) else k): v for k, v in data.items()} return data def last_not_none(lst: List) -> Optional[Any]: for i in reversed(lst): if i is not None: return i return None