python-package/lets_plot/geo_data/to_geo_data_frame.py (107 lines of code) (raw):

from typing import List import shapely from geopandas import GeoDataFrame from pandas import DataFrame from shapely.geometry import box from lets_plot.geo_data import PlacesDataFrameBuilder, abstractmethod from lets_plot.geo_data.geocodes import _zip_answers from lets_plot.geo_data.gis.request import RegionQuery, LevelKind from lets_plot.geo_data.gis.response import Answer, GeocodedFeature, GeoRect, Boundary, Multipolygon, Polygon, GeoPoint ShapelyPoint = shapely.geometry.Point ShapelyLinearRing = shapely.geometry.LinearRing ShapelyPolygon = shapely.geometry.Polygon ShapelyMultiPolygon = shapely.geometry.MultiPolygon def _create_geo_data_frame(data, geometry) -> DataFrame: return GeoDataFrame( data, crs='EPSG:4326', geometry=geometry ) class RectGeoDataFrame: def __init__(self): super().__init__() self._lonmin: List[float] = [] self._latmin: List[float] = [] self._lonmax: List[float] = [] self._latmax: List[float] = [] def to_data_frame(self, answers: List[Answer], queries: List[RegionQuery], level_kind: LevelKind) -> DataFrame: assert len(answers) == len(queries) places = PlacesDataFrameBuilder(level_kind) for query, answer in _zip_answers(queries, answers): for feature in answer.features: rects: List[GeoRect] = self._read_rect(feature) for rect in rects: places.append_row(query, feature) self._lonmin.append(rect.start_lon) self._latmin.append(rect.min_lat) self._lonmax.append(rect.end_lon) self._latmax.append(rect.max_lat) geometry = [ box(lmt[0], lmt[1], lmt[2], lmt[3]) for lmt in zip(self._lonmin, self._latmin, self._lonmax, self._latmax) ] return _create_geo_data_frame(places.build_dict(), geometry=geometry) def _read_rect(self, feature: GeocodedFeature) -> List[GeoRect]: rect: GeoRect = self._select_rect(feature) if rect.crosses_antimeridian(): return [ GeoRect(start_lon=rect.start_lon, end_lon=180., min_lat=rect.min_lat, max_lat=rect.max_lat), GeoRect(start_lon=-180., end_lon=rect.end_lon, min_lat=rect.min_lat, max_lat=rect.max_lat) ] else: return [rect] @abstractmethod def _select_rect(self, feature: GeocodedFeature) -> GeoRect: pass class CentroidsGeoDataFrame: def __init__(self): super().__init__() self._lons: List[float] = [] self._lats: List[float] = [] def to_data_frame(self, answers: List[Answer], queries: List[RegionQuery], level_kind: LevelKind) -> DataFrame: places = PlacesDataFrameBuilder(level_kind) for query, answer in _zip_answers(queries, answers): for feature in answer.features: places.append_row(query, feature) self._lons.append(feature.centroid.lon) self._lats.append(feature.centroid.lat) geometry = [ShapelyPoint(pnt[0], pnt[1]) for pnt in zip(self._lons, self._lats)] return _create_geo_data_frame(places.build_dict(), geometry) class BoundariesGeoDataFrame: def __init__(self): super().__init__() def to_data_frame(self, answers: List[Answer], queries: List[RegionQuery], level_kind: LevelKind) -> DataFrame: places = PlacesDataFrameBuilder(level_kind) geometry = [] for query, answer in _zip_answers(queries, answers): for feature in answer.features: places.append_row(query, feature) geometry.append(self._geo_parse_geometry(feature.boundary)) return _create_geo_data_frame(places.build_dict(), geometry=geometry) def _geo_parse_geometry(self, boundary: Boundary): geometry = boundary.geometry if isinstance(geometry, GeoPoint): return self._geo_parse_point(geometry) if isinstance(geometry, Polygon): return self._geo_parse_polygon(geometry) if isinstance(geometry, Multipolygon): return self._geo_parse_multipolygon(geometry) raise ValueError('Invalid geometry type') def _geo_parse_multipolygon(self, geometry: Multipolygon) -> ShapelyMultiPolygon: geo_polygons: List[ShapelyPolygon] = [self._geo_parse_polygon(polygon) for polygon in geometry.polygons] return ShapelyMultiPolygon(geo_polygons) def _geo_parse_polygon(self, polygon: Polygon) -> ShapelyPolygon: geo_rings: List[ShapelyLinearRing] = [ ShapelyLinearRing([(p.lon, p.lat) for p in ring.points]) for ring in polygon.rings ] if len(geo_rings) == 0: return ShapelyPolygon() else: return ShapelyPolygon(shell=geo_rings[0], holes=geo_rings[1:]) def _geo_parse_point(self, geometry_data: GeoPoint) -> ShapelyPoint: return ShapelyPoint((geometry_data.lon, geometry_data.lat)) class LimitsGeoDataFrame(RectGeoDataFrame): def _select_rect(self, feature: GeocodedFeature) -> GeoRect: return feature.limit class PositionsGeoDataFrame(RectGeoDataFrame): def _select_rect(self, feature: GeocodedFeature) -> GeoRect: return feature.position