python-package/lets_plot/geo_data/geocodes.py (316 lines of code) (raw):

import enum from abc import abstractmethod from collections.abc import Iterable from typing import List, Optional, Union, Dict from pandas import DataFrame, Series from lets_plot.geo_data_internals.constants import DF_COLUMN_HIGHLIGHTS, DF_COLUMN_COUNTRY, DF_COLUMN_STATE, \ DF_COLUMN_COUNTY, DF_COLUMN_CITY, DF_COLUMN_ID, DF_COLUMN_FOUND_NAME, DF_COLUMN_POSITION, DF_COLUMN_LIMIT, \ DF_COLUMN_CENTROID from .gis.geocoding_service import GeocodingService from .gis.request import PayloadKind, RequestBuilder, RequestKind, MapRegion, RegionQuery from .gis.response import Answer, GeocodedFeature, Namesake, AmbiguousFeature, LevelKind from .gis.response import SuccessResponse, Response, AmbiguousResponse, ErrorResponse from .type_assertion import assert_type, assert_list_type NO_OBJECTS_FOUND_EXCEPTION_TEXT = 'No objects were found.' MULTIPLE_OBJECTS_FOUND_EXCEPTION_TEXT = "Multiple objects were found. Use all_result=True to see them." class Resolution(enum.Enum): city_high = 15 city_medium = 14 city_low = 13 county_high = 12 county_medium = 11 county_low = 10 state_high = 9 state_medium = 8 state_low = 7 country_high = 6 country_medium = 5 country_low = 4 world_high = 3 world_medium = 2 world_low = 1 class PlacesDataFrameBuilder: def __init__(self, level_kind: LevelKind): self.level_kind: LevelKind = level_kind self._request: List[str] = [] self._found_name: List[str] = [] self._county: List[Optional[str]] = [] self._state: List[Optional[str]] = [] self._country: List[Optional[str]] = [] def append_row(self, query: RegionQuery, feature: GeocodedFeature): self._request.append(_select_request_string(query.request, feature.name)) self._found_name.append(feature.name) if query is None: self._county.append(MapRegion.name_or_none(None)) self._state.append(MapRegion.name_or_none(None)) self._country.append(MapRegion.name_or_none(None)) else: self._county.append(MapRegion.name_or_none(query.county)) self._state.append(MapRegion.name_or_none(query.state)) self._country.append(MapRegion.name_or_none(query.country)) def build_dict(self): def contains_values(column): return any(v is not None for v in column) data = {} request_column = _level_to_column_name(self.level_kind) data[request_column] = self._request data[DF_COLUMN_FOUND_NAME] = self._found_name if contains_values(self._county): data[DF_COLUMN_COUNTY] = self._county if contains_values(self._state): data[DF_COLUMN_STATE] = self._state if contains_values(self._country): data[DF_COLUMN_COUNTRY] = self._country return data @abstractmethod def to_data_frame(self, answers: List[Answer], queries: List[RegionQuery], level_kind: LevelKind) -> DataFrame: raise ValueError('Not implemented') class Geocodes: def __init__(self, level_kind: LevelKind, answers: List[Answer], queries: List[RegionQuery], highlights: bool = False): assert_list_type(answers, Answer) assert_list_type(queries, RegionQuery) if len(answers) == 0: assert len(queries) == 1 and queries[0].request is None # select all else: assert len(queries) == len(answers) # regular request - should have same size try: import geopandas except ImportError: raise ValueError('Module \'geopandas\'is required for geocoding') from None self._level_kind: LevelKind = level_kind self._answers: List[Answer] = answers features = [] for answer in answers: features.extend(answer.features) self._geocoded_features: List[GeocodedFeature] = features self._highlights: bool = highlights self._queries: List[RegionQuery] = queries def __repr__(self): return self.to_data_frame().to_string() def __len__(self): return len(self._geocoded_features) def to_map_regions(self) -> List[MapRegion]: regions: List[MapRegion] = [] for answer, query in _zip_answers(self._answers, self._queries): for feature in answer.features: regions.append( MapRegion.place(feature.id, _select_request_string(query.request, feature.name), self._level_kind)) return regions def as_list(self) -> List['Geocodes']: if len(self._queries) == 0: return [Geocodes(self._level_kind, [answer], [RegionQuery(request=None)], self._highlights) for answer in self._answers] assert len(self._queries) == len(self._answers) return [Geocodes(self._level_kind, [answer], [query], self._highlights) for query, answer in zip(self._queries, self._answers)] def unique_ids(self) -> List[str]: seen = set() seen_add = seen.add return [feature.id for feature in self._geocoded_features if not (feature.id in seen or seen_add(feature.id))] def boundaries(self, resolution: Optional[Union[int, str, Resolution]] = None, inc_res: int = 0): """ Return boundaries for given regions in form of GeoDataFrame. Parameters ---------- resolution: [str | int | None] Boundaries resolution. int: [1-15] 15 - maximum quality, 1 - maximum performance: - 1-3 for world scale view - 4-6 for country scale view - 7-9 for state scale view - 10-12 for county scale view - 13-15 for city scale view str: ['world', 'country', 'state', 'county', 'city'] 'city' - maximum quality, 'world' - maximum performance. Corresponding numeric resolutions: - 'world' - 2 - 'country' - 5 - 'state' - 8 - 'county' - 11 - 'city' - 14 Kind of area expected to be displayed. Resolution depends on a number of objects - single state is a 'state' scale view, while 50 states is a 'country' scale view. It is allowed to use any kind of resolution for any regions, i.e. 'city' for state to see more detailed boundary (when need to show zoomed part), or 'world' (when used for small preview). None: Autodetection. Uses level_kind that was used for geocoding this regions object and number of objects in it. Prefers performance over qulity. It's expected to get pixelated geometries with autodetection. Use explicit resolution for better quality. Resolution for countries: If n < 3 => 3 else => 1 Resolution for states: If n < 3 => 7 If n < 10 => 4 else => 2 Resolution for counties: If n < 5 => 10 If n < 20 => 8 else => 3 Resolution for cities: If n < 5 => 13 If n < 50 => 4 else => 3 inc_res: int Increase auto-detected resolution. Examples -------- .. jupyter-execute:: >>> from lets_plot.geo_data import * >>> rb = regions_country(['germany', 'russia']).boundaries() >>> rb """ from lets_plot.geo_data.to_geo_data_frame import BoundariesGeoDataFrame if resolution is None: autodetected_resolution = _autodetect_resolution(self._level_kind, len(self._geocoded_features)) int_resolution = min(Resolution.city_high.value, autodetected_resolution + inc_res) elif isinstance(resolution, int): int_resolution = resolution elif isinstance(resolution, Resolution): int_resolution = resolution.value elif isinstance(resolution, str): int_resolution = _parse_resolution(resolution).value else: raise ValueError('Invalid resolution: ' + type(resolution).__name__) if int_resolution < Resolution.world_low.value or int_resolution > Resolution.city_high.value: raise ValueError( "Resolution is out of range. Expected to be from ({}) to ({}), but was ({})." .format(Resolution.world_low.value, Resolution.city_high.value, int_resolution) ) return self._execute( self._request_builder(PayloadKind.boundaries) .set_resolution(int_resolution), BoundariesGeoDataFrame() ) def limits(self) -> 'GeoDataFrame': """ Return bboxes (Polygon geometry) for given regions in form of GeoDataFrame. For regions intersecting anti-meridian bbox will be divided into two and stored as two rows. Examples --------- .. jupyter-execute:: >>> from lets_plot.geo_data import * >>> rl = regions_country(['germany', 'russia']).limits() >>> rl """ from lets_plot.geo_data.to_geo_data_frame import LimitsGeoDataFrame return self._execute( self._request_builder(PayloadKind.limits), LimitsGeoDataFrame() ) def centroids(self): """ Return centroids (Point geometry) for given regions in form of GeoDataFrame. Examples --------- .. jupyter-execute:: >>> from lets_plot.geo_data import * >>> rc = regions_country(['germany', 'russia']).centroids() >>> rc """ from lets_plot.geo_data.to_geo_data_frame import CentroidsGeoDataFrame return self._execute( self._request_builder(PayloadKind.centroids), CentroidsGeoDataFrame() ) def to_data_frame(self) -> DataFrame: places = PlacesDataFrameBuilder(self._level_kind) # for us-48 queries doesnt' count for query, answer in _zip_answers(self._queries, self._answers): for feature in answer.features: places.append_row(query, feature) def geo_rect_to_list(geo_rect: 'GeoRect') -> List: return [geo_rect.start_lon, geo_rect.min_lat, geo_rect.end_lon, geo_rect.max_lat] data = { DF_COLUMN_ID: [feature.id for feature in self._geocoded_features], **places.build_dict(), DF_COLUMN_CENTROID: [[feature.centroid.lon, feature.centroid.lat] for feature in self._geocoded_features], DF_COLUMN_POSITION: [geo_rect_to_list(feature.position) for feature in self._geocoded_features], DF_COLUMN_LIMIT: [geo_rect_to_list(feature.limit) for feature in self._geocoded_features] } if self._highlights: data[DF_COLUMN_HIGHLIGHTS] = [feature.highlights for feature in self._geocoded_features] return DataFrame(data) def _execute(self, request_builder: RequestBuilder, df_converter): response = GeocodingService().do_request(request_builder.build()) if not isinstance(response, SuccessResponse): _raise_exception(response) features = [] for a in response.answers: features.extend(a.features) self._join_payload(features) return df_converter.to_data_frame(self._answers, self._queries, self._level_kind) def _request_builder(self, payload_kind: PayloadKind) -> RequestBuilder: assert_type(payload_kind, PayloadKind) return RequestBuilder() \ .set_request_kind(RequestKind.explicit) \ .set_ids(self.unique_ids()) \ .set_requested_payload([payload_kind]) def _join_payload(self, payloads: List[GeocodedFeature]): for payload in payloads: for feature in self._get_features(payload.id): if payload.limit is not None: feature.limit = payload.limit if payload.boundary is not None: feature.boundary = payload.boundary if payload.centroid is not None: feature.centroid = payload.centroid if payload.position is not None: feature.position = payload.position def _get_features(self, feature_id: str) -> List[GeocodedFeature]: return [feature for feature in self._geocoded_features if feature.id == feature_id] request_types = Optional[Union[str, List[str], Series]] def _raise_exception(response: Response): msg = _format_error_message(response) raise ValueError(msg) def _format_error_message(response: Response) -> str: if isinstance(response, AmbiguousResponse): not_found_names: Dict = {} multiple_objects: List[AmbiguousFeature] = [] for ambiguous_feature in response.features: if ambiguous_feature.total_namesake_count == 0: not_found_names[ambiguous_feature.query] = None if ambiguous_feature.total_namesake_count > 0: multiple_objects.append(ambiguous_feature) if len(not_found_names) > 0: display_limit = 10 msg_text = 'No objects were found for ' if len(not_found_names) > display_limit: msg_text += ', '.join(list(not_found_names.keys())[:display_limit]) msg_text += ' and ({}) more'.format(len(not_found_names) - display_limit) else: msg_text += ', '.join(list(not_found_names.keys())) return msg_text + '.\n' if len(multiple_objects) > 0: message = '' for multiple_object in multiple_objects: message += _create_multiple_error_message( multiple_object.query, multiple_object.namesake_examples, multiple_object.total_namesake_count ) + '\n' return message return 'Invalid bad feature' if isinstance(response, ErrorResponse): return response.message return 'Unsupported error response status: ' + str(response.__class__) def _create_multiple_error_message(request: str, namesakes: List[Namesake], total_namesake_count: int): lines = [] for namesake in namesakes: line = '- ' + namesake.name if len(namesake.parents) > 0: line += ' (' + ', '.join([o.name for o in namesake.parents]) + ')' lines.append(line) text = 'Multiple objects (' + str(total_namesake_count) + ') were found for ' + request if not lines: text += '.' else: text += ':\n' + '\n'.join(lines) return text def _to_level_kind(level_kind: Optional[Union[str, LevelKind]]) -> Optional[LevelKind]: if level_kind is None: return None if isinstance(level_kind, LevelKind): return level_kind if isinstance(level_kind, str): return LevelKind(level_kind) raise ValueError('Invalid level kind') def _parse_resolution(resolution: str) -> Resolution: if isinstance(resolution, str): if resolution == 'city': return Resolution.city_medium if resolution == 'county': return Resolution.county_medium if resolution == 'state': return Resolution.state_medium if resolution == 'country': return Resolution.country_medium if resolution == 'world': return Resolution.world_medium return Resolution[resolution] raise ValueError('Invalid resolution type: ' + type(resolution).__name__) def _ensure_is_list(obj) -> Optional[List[str]]: if obj is None: return None if isinstance(obj, Iterable) and not isinstance(obj, str): return [v for v in obj] return [obj] def _autodetect_resolution(level: LevelKind, count: int) -> int: if level == LevelKind.country: if count < 3: return Resolution.world_high.value else: return Resolution.world_low.value if level == LevelKind.state: if count < 3: return Resolution.state_low.value if count < 10: return Resolution.country_low.value else: return Resolution.world_medium.value if level == LevelKind.county: if count < 5: return Resolution.county_low.value elif count < 20: return Resolution.state_medium.value else: return Resolution.world_high.value if level == LevelKind.city: if count < 5: return Resolution.city_low.value elif count < 50: return Resolution.country_low.value else: return Resolution.world_high.value def _select_request_string(request: Optional[str], name: str) -> str: if request is None: return name if len(request) == 0: return name if 'us-48' == request.lower(): return name return request def _level_to_column_name(level_kind: LevelKind): if level_kind == LevelKind.city: return DF_COLUMN_CITY elif level_kind == LevelKind.county: return DF_COLUMN_COUNTY elif level_kind == LevelKind.state: return DF_COLUMN_STATE elif level_kind == LevelKind.country: return DF_COLUMN_COUNTRY else: raise ValueError('Unknown level kind: {}'.format(level_kind)) def _zip_answers(queries: List, answers: List): if len(queries) > 0: return zip(queries, answers) else: return zip([None] * len(answers), answers)