metropolis/utils/data_classes.py (414 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates.
# Original copyright notice:
# nuScenes dev-kit.
# Code written by Oscar Beijbom, 2018.
import copy
import os.path as osp
from abc import ABC, abstractmethod
from functools import reduce
from typing import Tuple, List, Dict, TYPE_CHECKING, Any, Optional
import numpy as np
from matplotlib.axes import ( # @manual=fbsource//third-party/pypi/matplotlib:matplotlib
Axes,
)
from pyquaternion import Quaternion
from . import pathmgr
from .geometry_utils import view_points, view_points_eq, transform_matrix, split_poly_eq
if TYPE_CHECKING:
from ..metropolis import Metropolis
EYE3 = np.eye(3)
EYE4 = np.eye(4)
class PointCloud(ABC):
"""Abstract class for manipulating and viewing point clouds.
Every point cloud (lidar and radar) consists of points where:
- Dimensions 0, 1, 2 represent x, y, z coordinates.
These are modified when the point cloud is rotated or translated.
- All other dimensions are optional. Hence these have to be manually modified if
the reference frame changes.
Args:
points: d-dimensional input point cloud matrix.
"""
def __init__(self, points: np.ndarray):
assert (
points.shape[0] == self.nbr_dims()
), f"Error: Pointcloud points must have format: {self.nbr_dims()} x n"
self.points = points
@staticmethod
@abstractmethod
def nbr_dims() -> int:
"""Returns the number of dimensions.
Returns: Number of dimensions.
"""
pass
@classmethod
@abstractmethod
def from_file(cls, file_name: str) -> "PointCloud":
"""Loads point cloud from disk.
Args:
file_name: Path of the pointcloud file on disk.
Returns:
PointCloud instance.
"""
pass
@classmethod
def from_file_multisweep(
cls,
metr: "Metropolis",
sample_rec: Dict[str, Any],
chan: str,
ref_chan: str,
nsweeps: int = 5,
min_distance: float = 1.0,
) -> Tuple["PointCloud", np.ndarray]:
"""Return a point cloud that aggregates multiple sweeps.
As every sweep is in a different coordinate frame, we need to map the
coordinates to a single reference frame. As every sweep has a different
timestamp, we need to account for that in the transformations and timestamps.
Args:
metr: A Metropolis instance.
sample_rec: The current sample.
chan: The lidar/radar channel from which we track back n sweeps to
aggregate the point cloud.
ref_chan: The reference channel of the current sample_rec that the point
clouds are mapped to.
nsweeps: Number of sweeps to aggregated.
min_distance: Distance below which points are discarded.
Returns:
all_pc: The aggregated point clouds.
all_times: The aggregated timestamps.
"""
# Init.
points = np.zeros(
(cls.nbr_dims(), 0),
dtype=np.float32 if cls == LidarPointCloud else np.float64,
)
all_pc = cls(points)
all_times = np.zeros((1, 0))
# Get reference pose and timestamp.
ref_sd_token = sample_rec["data"][ref_chan]
ref_sd_rec = metr.get("sample_data", ref_sd_token)
ref_pose_rec = metr.get("ego_pose", ref_sd_rec["ego_pose_token"])
ref_cs_rec = metr.get(
"calibrated_sensor", ref_sd_rec["calibrated_sensor_token"]
)
ref_time = 1e-6 * ref_sd_rec["timestamp"]
# Homogeneous transform from ego car frame to reference frame.
ref_from_car = transform_matrix(
ref_cs_rec["translation"], Quaternion(ref_cs_rec["rotation"]), inverse=True
)
# Homogeneous transformation matrix from global to _current_ ego car frame.
car_from_global = transform_matrix(
ref_pose_rec["translation"],
Quaternion(ref_pose_rec["rotation"]),
inverse=True,
)
# Aggregate current and previous sweeps.
sample_data_token = sample_rec["data"][chan]
current_sd_rec = metr.get("sample_data", sample_data_token)
for _ in range(nsweeps):
# Load up the pointcloud and remove points close to the sensor.
current_pc = cls.from_file(
osp.join(metr.dataroot, current_sd_rec["filename"])
)
current_pc.remove_close(min_distance)
# Get past pose.
current_pose_rec = metr.get("ego_pose", current_sd_rec["ego_pose_token"])
global_from_car = transform_matrix(
current_pose_rec["translation"],
Quaternion(current_pose_rec["rotation"]),
inverse=False,
)
# Homogeneous transformation matrix from sensor coordinate frame to ego car frame.
current_cs_rec = metr.get(
"calibrated_sensor", current_sd_rec["calibrated_sensor_token"]
)
car_from_current = transform_matrix(
current_cs_rec["translation"],
Quaternion(current_cs_rec["rotation"]),
inverse=False,
)
# Fuse four transformation matrices into one and perform transform.
trans_matrix = reduce(
np.dot,
[ref_from_car, car_from_global, global_from_car, car_from_current],
)
current_pc.transform(trans_matrix)
# Add time vector which can be used as a temporal feature.
time_lag = (
ref_time - 1e-6 * current_sd_rec["timestamp"]
) # Positive difference.
times = time_lag * np.ones((1, current_pc.nbr_points()))
all_times = np.hstack((all_times, times))
# Merge with key pc.
all_pc.points = np.hstack((all_pc.points, current_pc.points))
# Abort if there are no previous sweeps.
if current_sd_rec["previous_sample_data"] == "":
break
else:
current_sd_rec = metr.get(
"sample_data", current_sd_rec["previous_sample_data"]
)
return all_pc, all_times
def nbr_points(self) -> int:
"""Returns the number of points.
Returns:
Number of points.
"""
return self.points.shape[1]
def subsample(self, ratio: float) -> None:
"""Sub-samples the pointcloud.
Args:
ratio: Fraction to keep.
"""
selected_ind = np.random.choice(
np.arange(0, self.nbr_points()), size=int(self.nbr_points() * ratio)
)
self.points = self.points[:, selected_ind]
def remove_close(self, radius: float) -> None:
"""Removes point too close within a certain radius from origin.
Args:
radius: Radius below which points are removed.
"""
x_filt = np.abs(self.points[0, :]) < radius
y_filt = np.abs(self.points[1, :]) < radius
not_close = np.logical_not(np.logical_and(x_filt, y_filt))
self.points = self.points[:, not_close]
def translate(self, x: np.ndarray) -> None:
"""Applies a translation to the point cloud.
Args:
x: Translation in x, y, z.
"""
for i in range(3):
self.points[i, :] = self.points[i, :] + x[i]
def rotate(self, rot_matrix: np.ndarray) -> None:
"""Applies a rotation.
Args:
rot_matrix: Rotation matrix.
"""
self.points[:3, :] = np.dot(rot_matrix, self.points[:3, :])
def transform(self, transf_matrix: np.ndarray) -> None:
"""Applies a homogeneous transform.
Args:
transf_matrix: Homogenous transformation matrix.
"""
self.points[:3, :] = transf_matrix.dot(
np.vstack((self.points[:3, :], np.ones(self.nbr_points())))
)[:3, :]
def render_height(
self,
ax: Axes,
view: np.ndarray = EYE4,
x_lim: Tuple[float, float] = (-20, 20),
y_lim: Tuple[float, float] = (-20, 20),
marker_size: float = 1,
) -> None:
"""Very simple method that applies a transformation and then scatter plots
the points colored by height (z-value).
Args:
ax: Axes on which to render the points.
view: Defines an arbitrary projection (n <= 4).
x_lim: (min, max). x range for plotting.
y_lim: (min, max). y range for plotting.
marker_size: Marker size.
"""
self._render_helper(2, ax, view, x_lim, y_lim, marker_size)
def render_intensity(
self,
ax: Axes,
view: np.ndarray = EYE4,
x_lim: Tuple[float, float] = (-20, 20),
y_lim: Tuple[float, float] = (-20, 20),
marker_size: float = 1,
) -> None:
"""Very simple method that applies a transformation and then scatter plots
the points colored by intensity.
Args:
ax: Axes on which to render the points.
view: Defines an arbitrary projection (n <= 4).
x_lim: (min, max).
y_lim: (min, max).
marker_size: Marker size.
"""
self._render_helper(3, ax, view, x_lim, y_lim, marker_size)
def _render_helper(
self,
color_channel: int,
ax: Axes,
view: np.ndarray,
x_lim: Tuple[float, float],
y_lim: Tuple[float, float],
marker_size: float,
) -> None:
"""Helper function for rendering.
Args:
color_channel: Point channel to use as color.
ax: Axes on which to render the points.
view: Defines an arbitrary projection (n <= 4).
x_lim: (min, max).
y_lim: (min, max).
marker_size: Marker size.
"""
points = view_points(self.points[:3, :], view, normalize=False)
ax.scatter(
points[0, :], points[1, :], c=self.points[color_channel, :], s=marker_size
)
ax.set_xlim(x_lim[0], x_lim[1])
ax.set_ylim(y_lim[0], y_lim[1])
class LidarPointCloud(PointCloud):
@staticmethod
def nbr_dims() -> int:
"""Returns the number of dimensions.
Returns:
Number of dimensions.
"""
return 4
@classmethod
def from_file(cls, file_name: str) -> "LidarPointCloud":
"""Loads LIDAR data from binary numpy format. Data is stored as (x, y, z,
intensity, ring index).
Args:
file_name: Path of the pointcloud file on disk.
Returns:
LidarPointCloud instance (x, y, z, intensity).
"""
assert file_name.endswith(".npz"), f"Unsupported filetype {file_name}"
with pathmgr.open(file_name, "rb") as fid:
data = np.load(fid)
points = data["points"]
points4 = np.ones((points.shape[0], 4))
points4[:, :3] = points
return cls(points4.T)
class Box:
"""Simple data class representing a 3d box
Args:
center: Center of box given as x, y, z.
size: Size of box in width, length, height.
orientation: Box orientation.
name: Box name, optional. Can be used e.g. for denote category name.
token: Unique string identifier from DB.
"""
def __init__(
self,
center: List[float],
size: List[float],
orientation: Quaternion,
name: str = None,
token: str = None,
):
assert not np.any(np.isnan(center))
assert not np.any(np.isnan(size))
assert len(center) == 3
assert len(size) == 3
assert type(orientation) == Quaternion
self.center = np.array(center)
self.lwh = np.array(size)
self.orientation = orientation
self.name = name
self.token = token
def __eq__(self, other):
center = np.allclose(self.center, other.center)
lwh = np.allclose(self.lwh, other.lwh)
orientation = np.allclose(self.orientation.elements, other.orientation.elements)
return center and lwh and orientation
def __repr__(self):
repr_str = (
"xyz: [{:.2f}, {:.2f}, {:.2f}], lwh: [{:.2f}, {:.2f}, {:.2f}], "
"rot axis: [{:.2f}, {:.2f}, {:.2f}], ang(degrees): {:.2f}, ang(rad): {:.2f}, "
"name: {}, token: {}"
)
return repr_str.format(
self.center[0],
self.center[1],
self.center[2],
self.lwh[0],
self.lwh[1],
self.lwh[2],
self.orientation.axis[0],
self.orientation.axis[1],
self.orientation.axis[2],
self.orientation.degrees,
self.orientation.radians,
self.name,
self.token,
)
@property
def rotation_matrix(self) -> np.ndarray:
"""Return a rotation matrix.
Returns:
The box's rotation matrix.
"""
return self.orientation.rotation_matrix
def translate(self, x: np.ndarray) -> None:
"""Applies a translation.
Args:
x: Translation in x, y, z direction.
"""
self.center += x
def rotate(self, quaternion: Quaternion) -> None:
"""Rotates box.
Args:
quaternion: Rotation to apply.
"""
self.center = np.dot(quaternion.rotation_matrix, self.center)
self.orientation = quaternion * self.orientation
def corners(self, lwh_factor: float = 1.0) -> np.ndarray:
"""Returns the bounding box corners.
Args:
lwh_factor: Multiply l, w, h by a factor to scale the box.
Returns:
First four corners are the ones facing forward. The last four are the
ones facing backwards.
"""
l, w, h = self.lwh * lwh_factor
# 3D bounding box corners. (Convention: x points right, y to the front, z up.)
y_corners = l / 2 * np.array([1, 1, 1, 1, -1, -1, -1, -1])
x_corners = w / 2 * np.array([-1, 1, 1, -1, -1, 1, 1, -1])
z_corners = h / 2 * np.array([1, 1, -1, -1, 1, 1, -1, -1])
corners = np.vstack((x_corners, y_corners, z_corners))
# Rotate
corners = np.dot(self.orientation.rotation_matrix, corners)
# Translate
x, y, z = self.center
corners[0, :] = corners[0, :] + x
corners[1, :] = corners[1, :] + y
corners[2, :] = corners[2, :] + z
return corners
def bottom_corners(self) -> np.ndarray:
"""Returns the four bottom corners.
Returns:
Bottom corners. First two face forward, last two face backwards.
"""
return self.corners()[:, [2, 3, 7, 6]]
def render(
self,
axis: Axes,
view: np.ndarray = EYE3,
normalize: bool = False,
colors: Tuple[Any, Any, Any] = ("b", "r", "k"),
linewidth: float = 2,
) -> None:
"""Renders the box in the provided Matplotlib axis.
Args:
axis: Axis onto which the box should be drawn.
view: Define a projection in needed (e.g. for drawing projection in an image).
normalize: Whether to normalize the remaining coordinate.
colors: Valid Matplotlib colors (<str> or normalized RGB tuple) for front,
back and sides.
linewidth: Width in pixel of the box sides.
"""
corners = view_points(self.corners(), view, normalize=normalize)[:2, :]
def draw_rect(selected_corners, color):
prev = selected_corners[-1]
for corner in selected_corners:
axis.plot(
[prev[0], corner[0]],
[prev[1], corner[1]],
color=color,
linewidth=linewidth,
)
prev = corner
# Draw the sides
for i in range(4):
axis.plot(
[corners.T[i][0], corners.T[i + 4][0]],
[corners.T[i][1], corners.T[i + 4][1]],
color=colors[2],
linewidth=linewidth,
)
# Draw front (first 4 corners) and rear (last 4 corners) rectangles(3d)/lines(2d)
draw_rect(corners.T[:4], colors[0])
draw_rect(corners.T[4:], colors[1])
# Draw line indicating the front
center_bottom_forward = np.mean(corners.T[2:4], axis=0)
center_bottom = np.mean(corners.T[[2, 3, 7, 6]], axis=0)
axis.plot(
[center_bottom[0], center_bottom_forward[0]],
[center_bottom[1], center_bottom_forward[1]],
color=colors[0],
linewidth=linewidth,
)
def render_eq(
self,
axis: Axes,
img_size: Tuple[int, int],
colors: Tuple[Any, Any, Any] = ("b", "r", "k"),
linewidth: float = 2,
num_samples: int = 20,
) -> None:
"""Renders the box in the provided Matplotlib axis, using an equirectangular
projection.
Args:
axis: Axis onto which the box should be drawn.
img_size: Image size as (width, height).
colors: Valid Matplotlib colors (<str> or normalized RGB tuple) for front,
back and sides.
linewidth: Width in pixel of the box sides.
num_samples: Number of points to sample on each edge of the bounding box
"""
t = np.linspace(0, 1, num_samples).reshape(1, -1)
corners = self.corners()
def draw_line(p1, p2, color):
line = p1.reshape(3, -1) + t * (p2 - p1).reshape(3, -1)
line = view_points_eq(line, img_size[0], img_size[1])
for line in split_poly_eq(line, img_size[0]):
axis.plot(line[0, :], line[1, :], color=color, linewidth=linewidth)
# Draw the sides
for i in range(4):
draw_line(corners[:, i], corners[:, i + 4], colors[2])
# Draw front and back
for i in range(4):
draw_line(corners[:, i % 4], corners[:, (i + 1) % 4], colors[0])
for i in range(4):
draw_line(corners[:, i % 4 + 4], corners[:, (i + 1) % 4 + 4], colors[1])
def copy(self) -> "Box":
"""Create a copy of self.
Returns:
A copy.
"""
return copy.deepcopy(self)
class Box2d:
"""Simple data class representing a 2d box
This representa an axis-aligned box on an equirectangular image, meaning that
the same region on a side-view perspective image will be a deformed rectangle.
Args:
coords: Bounding box coordinates [left, top, right, bottom]. Note that for
boxes that "wrap around" the equirectangular image, left > right, while
for all others left < right.
name: Box name, optional. Can be used e.g. for denote category name.
token: Unique string identifier from DB.
"""
def __init__(
self,
coords: List[float],
name: str = None,
token: str = None,
):
assert not np.any(np.isnan(coords))
assert len(coords) == 4
self.coords = coords
self.name = name
self.token = token
def __eq__(self, other) -> bool:
return np.allclose(self.coords, other.coords)
def __repr__(self) -> str:
return "[{:.2f}, {:.2f}, {:.2f}, {:.2f}], name={}, token={}".format(
self.coords[0],
self.coords[1],
self.coords[2],
self.coords[3],
self.name,
self.token,
)
def render(
self, axis: Axes, width: int, color: Any = "r", linewidth: int = 2
) -> None:
"""Renders the box in the provided Matplotlib axis.
Args:
axis: Axis onto which the box should be drawn.
width: Width of the equirectangular image.
colors: Valid Matplotlib color.
linewidth: Width in pixel of the box sides.
"""
if self.coords[0] < self.coords[2]:
segments = [
([self.coords[0], self.coords[2]], [self.coords[1], self.coords[1]]),
([self.coords[2], self.coords[2]], [self.coords[1], self.coords[3]]),
([self.coords[2], self.coords[0]], [self.coords[3], self.coords[3]]),
([self.coords[0], self.coords[0]], [self.coords[3], self.coords[1]]),
]
else:
segments = [
([self.coords[0], width], [self.coords[1], self.coords[1]]),
([0, self.coords[2]], [self.coords[1], self.coords[1]]),
([self.coords[2], self.coords[2]], [self.coords[1], self.coords[3]]),
([self.coords[2], 0], [self.coords[3], self.coords[3]]),
([width, self.coords[0]], [self.coords[3], self.coords[3]]),
([self.coords[0], self.coords[0]], [self.coords[3], self.coords[1]]),
]
for x, y in segments:
axis.plot(x, y, color=color, linewidth=linewidth)
class EquiBox2d:
"""2D bounding box on an equirectangular image reprojected to a perspective image
Args:
points: 2D points defining the box contour.
name: Box name, optional. Can be used e.g. for denote category name.
token: Unique string identifier from DB.
"""
def __init__(
self,
points: np.ndarray,
name: Optional[str] = None,
token: Optional[str] = None,
):
assert points.shape[0] == 2
self.points = points
self.name = name
self.token = token
def render(self, axis: Axes, color: Any = "r", linewidth: int = 2) -> None:
"""Renders the box in the provided Matplotlib axis.
Args:
axis: Axis onto which the box should be drawn.
width: Width of the equirectangular image.
colors: Valid Matplotlib color.
linewidth: Width in pixel of the box sides.
"""
axis.plot(self.points[0], self.points[1], color=color, linewidth=linewidth)
@classmethod
def from_box_2d(
cls,
box: Box2d,
q_eq: Quaternion,
q_pr: Quaternion,
intrinsic: np.ndarray,
size_eq: Tuple[int, int],
size_pr: Tuple[int, int],
num_samples: int = 20,
) -> Optional["EquiBox2d"]:
"""Project an equirectangular bounding box to a projective image
This assumes that both cameras have the same center, being related by a
simple rotation transformation. The output box is constructed by projecting
a fixed number of points on each edge of the input box from the
equirectangular to the perspective image, forming a discrete approximation
of its curved shape.
Args:
box: A bounding box defined in the equirectangular image.
q_eq: The rotation of the equirectangular image with respect to an
external frame of reference (e.g the ego vehicle).
q_pr: The rotation of the projective image with respect to the same
external frame of referene.
intrinsic: Intrinsic matrix of the projective image.
size_eq: The size of the equirectangular image, as width, height
size_pr: The size of the projective image, as width, height
num_samples: Number of points to sample on each edge of the bounding box
Returns:
The projected box, or None if the given box falls completely outside
of the projective image.
"""
def get_points(x0, y0, x1, y1):
side1 = np.stack(
[
np.linspace(x0, x1, num_samples, dtype=np.float32),
np.full((num_samples,), y0, dtype=np.float32),
],
axis=0,
)
side2 = np.stack(
[
np.full((num_samples,), x1, dtype=np.float32),
np.linspace(y0, y1, num_samples, dtype=np.float32),
],
axis=0,
)
side3 = np.stack(
[
np.linspace(x1, x0, num_samples, dtype=np.float32),
np.full((num_samples,), y1, dtype=np.float32),
],
axis=0,
)
side4 = np.stack(
[
np.full((num_samples,), x0, dtype=np.float32),
np.linspace(y1, y0, num_samples, dtype=np.float32),
],
axis=0,
)
return np.concatenate([side1, side2, side3, side4], axis=1)
def project(points):
# From pixel coordinates to angles
u = (points[0] / size_eq[0] - 0.5) * 2 * np.pi
v = (points[1] / size_eq[1] - 0.5) * np.pi
# From angles to 3D coordinates on the sphere
p_eq = np.stack(
[np.cos(v) * np.sin(u), np.sin(v), np.cos(v) * np.cos(u)], axis=0
)
# Rotate to target camera
p_pr = np.dot(q_pr.rotation_matrix.T, np.dot(q_eq.rotation_matrix, p_eq))
# Filter out points that end up behind the camera
p_pr = p_pr[:, p_pr[2] > 0]
# Apply camera intrinsics and project
p_pr = np.dot(intrinsic, p_pr)
p_pr = np.stack([p_pr[0] / p_pr[2], p_pr[1] / p_pr[2]], axis=0)
return p_pr
# Extract and normalize the input box coordinates
x0, y0, x1, y1 = box.coords
if x0 > x1:
x1 += size_eq[0]
# "draw" the box with num_samples on each side
points = get_points(x0, y0, x1, y1)
# Project to the perspective image
points = project(points)
# Filter out points that end up outside of the image
valid = points[0] >= 0
valid = np.logical_and(valid, points[1] >= 0)
valid = np.logical_and(valid, points[0] < size_pr[0])
valid = np.logical_and(valid, points[1] < size_pr[1])
if not valid.any():
return None
# points = points[:, valid]
return cls(points, name=box.name, token=box.token)