python-package/lets_plot/geo_data/gis/json_response.py (254 lines of code) (raw):

import json from enum import Enum from functools import partial from typing import Dict, List, Optional, Union from .fluent_dict import FluentDict from .geometry import Ring from .response import Multipolygon, GeoPoint, GeoRect, Boundary, Polygon from .response import Response, ResponseBuilder, SuccessResponse, AmbiguousResponse from .response import Status, LevelKind, Answer, GeocodedFeature, AmbiguousFeature, Namesake, NamesakeParent, \ FeatureBuilder class ResponseField(Enum): status = 'status' message = 'message' data = 'data' answers = 'answers' features = 'features' geocoded_data = 'good_features' incorrect_data = 'bad_features' level = 'level' position = 'position' query = 'query' name = 'name' highlights = 'highlights' limit = 'limit' centroid = 'centroid' boundary = 'boundary' boundary_type = 'type' geo_object_id = 'id' boundary_lon = 'lon' boundary_lat = 'lat' boundary_coordinates = 'coordinates' centroid_lon = 'lon' centroid_lat = 'lat' min_lon = 'min_lon' min_lat = 'min_lat' max_lon = 'max_lon' max_lat = 'max_lat' total_namesake_count = 'total_namesake_count' namesake_examples = 'namesake_examples' namesake_name = 'name' namesake_parents = 'parents' namesake_parent_name = 'name' namesake_parent_level = 'level' class GeometryKind(Enum): point = 'Point' polygon = 'Polygon' multipolygon = 'MultiPolygon' class ResponseParser: @staticmethod def parse(response_json: Dict) -> Response: response = ResponseBuilder() response_dict = FluentDict(response_json) \ .visit_enum(ResponseField.status, Status, response.set_status) \ .visit_str(ResponseField.message, response.set_message) if response.status == Status.error: return response.build() data_dict = FluentDict(response_dict.get(ResponseField.data)) \ .visit_enum_existing(ResponseField.level, LevelKind, response.set_level) if response.status == Status.success: data_dict.visit(ResponseField.answers, partial(ResponseParser._parse_answers, response=response)) elif response.status == Status.ambiguous: data_dict.visit(ResponseField.features, partial(ResponseParser._parse_ambiguous_features, response=response)) else: raise ValueError('Unknown response kind') return response.build() @staticmethod def _parse_answers(answers_json: List[Dict], response: ResponseBuilder): answers: List[Answer] = [] for answer_json in answers_json: features_json = answer_json.get(ResponseField.features.value, []) geocoded_features: List[GeocodedFeature] = [] for feature_json in features_json: feature = FeatureBuilder() FluentDict(feature_json) \ .visit_str(ResponseField.geo_object_id, feature.set_id) \ .visit_str(ResponseField.name, feature.set_name) \ .visit_str_list_optional(ResponseField.highlights, feature.set_highlights) \ .visit_str_existing(ResponseField.boundary, lambda json: feature.set_boundary(GeoJson().parse_geometry(json))) \ .visit_object_optional(ResponseField.centroid, lambda json: feature.set_centroid(ResponseParser._parse_point(json))) \ .visit_object_optional(ResponseField.limit, lambda json: feature.set_limit(ResponseParser._parse_rect(json))) \ .visit_object_optional(ResponseField.position, lambda json: feature.set_position(ResponseParser._parse_rect(json))) geocoded_features.append(feature.build_geocoded()) answers.append(Answer(geocoded_features)) response.set_answers(answers) @staticmethod def _parse_ambiguous_features(features_json: List[Dict], response: ResponseBuilder): ambiguous_features: List[AmbiguousFeature] = [] for feature_json in features_json: feature = FeatureBuilder() FluentDict(feature_json) \ .visit_str(ResponseField.query, feature.set_query) \ .visit_int(ResponseField.total_namesake_count, feature.set_total_namesake_count) \ .visit_objects(ResponseField.namesake_examples, lambda json: feature.add_namesake(ResponseParser._parse_namesake(json))) ambiguous_features.append(feature.build_ambiguous()) response.set_ambiguous_features(ambiguous_features) @staticmethod def _parse_point(centroid_dict: FluentDict) -> GeoPoint: return GeoPoint( centroid_dict.get(ResponseField.centroid_lon), centroid_dict.get(ResponseField.centroid_lat) ) @staticmethod def _parse_rect(rect_dict: FluentDict) -> GeoRect: return GeoRect( rect_dict.get(ResponseField.min_lon), rect_dict.get(ResponseField.min_lat), rect_dict.get(ResponseField.max_lon), rect_dict.get(ResponseField.max_lat), ) @staticmethod def _parse_namesake(namesake_dict: FluentDict): return Namesake( namesake_dict.get(ResponseField.namesake_name), namesake_dict.get_objects(ResponseField.namesake_parents) .map( lambda parent: NamesakeParent( parent.get(ResponseField.namesake_parent_name), parent.get_enum(ResponseField.namesake_parent_level, LevelKind))) .list() ) class ResponseFormatter: @staticmethod def format(response: Response) -> Dict: if isinstance(response, SuccessResponse): return FluentDict() \ .put(ResponseField.status, Status.success.value) \ .put(ResponseField.message, response.message) \ .put(ResponseField.data, FluentDict() .put(ResponseField.level, response.level.value) .put(ResponseField.answers, list(map(ResponseFormatter._format_answer, response.answers)))) \ .to_dict() elif isinstance(response, AmbiguousResponse): return FluentDict() \ .put(ResponseField.status, Status.ambiguous.value) \ .put(ResponseField.message, response.message) \ .put(ResponseField.data, FluentDict() .put(ResponseField.level, response.level.value) .put(ResponseField.features, list(map(ResponseFormatter._format_ambiguous_feature, response.features)))) \ .to_dict() @staticmethod def _format_answer(answer: Answer) -> Dict: features = [] for feature in answer.features: features.append( FluentDict() \ .put(ResponseField.geo_object_id, feature.id) \ .put(ResponseField.name, feature.name) \ .put(ResponseField.boundary, ResponseFormatter._format_boundary(feature.boundary)) \ .put(ResponseField.centroid, ResponseFormatter._format_centroid(feature.centroid)) \ .put(ResponseField.limit, ResponseFormatter._format_rect(feature.limit)) \ .put(ResponseField.position, ResponseFormatter._format_rect(feature.position)) \ .to_dict() ) return FluentDict() \ .put(ResponseField.features, features) \ .to_dict() @staticmethod def _format_centroid(point: Optional[GeoPoint]) -> Optional[Dict]: if point is None: return None return FluentDict() \ .put(ResponseField.centroid_lon, point.lon) \ .put(ResponseField.centroid_lat, point.lat) \ .to_dict() @staticmethod def _format_rect(rect: Optional[GeoRect]) -> Optional[Dict]: if rect is None: return None return FluentDict() \ .put(ResponseField.min_lon, rect.start_lon) \ .put(ResponseField.min_lat, rect.min_lat) \ .put(ResponseField.max_lon, rect.end_lon) \ .put(ResponseField.max_lat, rect.max_lat) \ .to_dict() @staticmethod def _format_boundary(boundary: Optional[Boundary]) -> Optional[str]: if boundary is None: return None return GeoJson.format_geometry(boundary) @staticmethod def _format_ambiguous_feature(feaure: AmbiguousFeature) -> Dict: return FluentDict() \ .put(ResponseField.query, feaure.query) \ .put(ResponseField.total_namesake_count, feaure.total_namesake_count) \ .put(ResponseField.namesake_examples, list(map(ResponseFormatter._format_namesake, feaure.namesake_examples))) \ .to_dict() @staticmethod def _format_namesake(namesake: Namesake) -> Dict: return FluentDict() \ .put(ResponseField.namesake_name, namesake.name) \ .put(ResponseField.namesake_parents, list(map(ResponseFormatter._format_namesake_parent, namesake.parents))) \ .to_dict() @staticmethod def _format_namesake_parent(parent: NamesakeParent) -> Dict: return FluentDict() \ .put(ResponseField.namesake_parent_name, parent.name) \ .put(ResponseField.namesake_parent_level, parent.level) \ .to_dict() class GeoJson: def __init__(self): self.lon_list: List[float] = [] self.lat_list: List[float] = [] @staticmethod def parse_geometry(geometry_line: str) -> Union[Multipolygon, Polygon, GeoPoint]: geoJson = GeoJson() return geoJson._do_parse(geometry_line) @staticmethod def format_geometry(boundary: Boundary) -> str: if isinstance(boundary.geometry, GeoPoint): return json.dumps({ ResponseField.boundary_type.value: GeometryKind.point.value, ResponseField.boundary_coordinates.value: [boundary.geometry.lon, boundary.geometry.lat] }) if isinstance(boundary.geometry, Polygon): return json.dumps({ ResponseField.boundary_type.value: GeometryKind.polygon.value, ResponseField.boundary_coordinates.value: GeoJson._format_polygon(boundary.geometry) }) if isinstance(boundary.geometry, Multipolygon): return json.dumps({ ResponseField.boundary_type.value: GeometryKind.polygon.value, ResponseField.boundary_coordinates.value: [GeoJson._format_polygon(poly) for poly in boundary.geometry.polygons] }) @staticmethod def _format_polygon(polygon: Polygon): poly = [] for ring in polygon.rings: poly.append([[p.lon, p.lat] for p in ring.points]) return poly def _do_parse(self, geometry_line: str) -> Union[Multipolygon, Polygon, GeoPoint]: geometry_data: dict = json.loads(geometry_line) geometry_type: GeometryKind = GeometryKind(geometry_data[ResponseField.boundary_type.value]) if geometry_type == GeometryKind.point: return self._parse_point(geometry_data) if geometry_type == GeometryKind.polygon: return self._parse_polygon(geometry_data[ResponseField.boundary_coordinates.value]) if geometry_type == GeometryKind.multipolygon: return self._parse_multipolygon(geometry_data[ResponseField.boundary_coordinates.value]) raise ValueError('Invalid geometry type') def _parse_multipolygon(self, geometry_data: List[List[List[List[float]]]]) -> Multipolygon: return Multipolygon([self._parse_polygon(p) for p in geometry_data]) def _parse_polygon(self, geometry_data: List[List[List[float]]]) -> Polygon: rings: List[Ring] = [] for ring in geometry_data: rings.append(Ring([GeoPoint(p[0], p[1]) for p in ring])) return Polygon(rings) def _parse_point(self, geometry_data: dict) -> GeoPoint: return GeoPoint( lon=geometry_data[ResponseField.boundary_lon.value], lat=geometry_data[ResponseField.boundary_lat.value] ) def _add_point(self, lon: float, lat: float) -> None: self.lon_list.append(lon) self.lat_list.append(lat)