scripts/generate_maskrcnn.py [17:381]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
from typing import cast, Iterable, Union, List, Tuple, NamedTuple, Optional, Dict

import numpy as np
import torch
from PIL import Image
import tqdm as tqdm
from allennlp.data import TorchImageLoader
from torch.utils.data import IterableDataset, DataLoader

from grolp.envs.thor_env import ThorEnv
import torchvision.ops.boxes as box_ops
import torch.nn.functional as F

TRAJ_DATA_JSON_FILENAME = "traj_data.json"

render_settings = dict()
render_settings['renderImage'] = True
render_settings['renderDepthImage'] = False
render_settings['renderObjectImage'] = False
render_settings['renderClassImage'] = False


class MaskDetectorOutput(NamedTuple):
    """
    The output type from the forward pass of a `RegionDetector`.
    """

    box_features: List[Tensor]
    """
    A list of tensors, each with shape `(num_boxes, feature_dim)`.
    """

    boxes: List[Tensor]

    masks: List[Tensor]

    class_probs: Optional[List[Tensor]] = None
    """
    An optional list of tensors. These tensors can have shape `(num_boxes,)` or
    `(num_boxes, *)` if probabilities for multiple classes are given.
    """

    class_labels: Optional[List[Tensor]] = None
    """
    An optional list of tensors that give the labels corresponding to the `class_probs`
    tensors. This should be non-`None` whenever `class_probs` is, and each tensor
    should have the same shape as the corresponding tensor from `class_probs`.
    """
    """
    A list of tensors containing the coordinates for each box. Each has shape `(num_boxes, 4)`.
    """


class MaskRCNNDetector(torch.nn.Module):
    """
    !!! Note
        This module does not have any trainable parameters by default.
        All pretrained weights are frozen.

    # Parameters

    box_score_thresh : `float`, optional (default = `0.05`)
        During inference, only proposal boxes / regions with a label classification score
        greater than `box_score_thresh` will be returned.

    box_nms_thresh : `float`, optional (default = `0.5`)
        During inference, non-maximum suppression (NMS) will applied to groups of boxes
        that share a common label.

        NMS iteratively removes lower scoring boxes which have an intersection-over-union (IoU)
        greater than `box_nms_thresh` with another higher scoring box.

    max_boxes_per_image : `int`, optional (default = `100`)
        During inference, at most `max_boxes_per_image` boxes will be returned. The
        number of boxes returned will vary by image and will often be lower
        than `max_boxes_per_image` depending on the values of `box_score_thresh`
        and `box_nms_thresh`.

    checkpoint: `str`, optional (default = `None`)
        If specified, we assume that we're loading a fine-tuned MaskRCNN model and not the one from Torchvision
    """

    def __init__(
            self,
            *,
            box_score_thresh: float = 0.05,
            box_nms_thresh: float = 0.5,
            max_boxes_per_image: int = 100,
            checkpoint_path: str = None,
            device="cpu"
    ):
        super().__init__()

        if checkpoint_path is None:
            self.detector = torchvision.models.detection.maskrcnn_resnet50_fpn(
                pretrained=True,
                box_score_thresh=box_score_thresh,
                box_nms_thresh=box_nms_thresh,
                box_detections_per_img=max_boxes_per_image,
            )
        else:

            if "moca" in checkpoint_path:
                maskrcnn = MaskRCNN(num_classes=119, hidden_size=256,
                                    inference_params=dict(box_score_thresh=box_score_thresh,
                                                          box_nms_thresh=box_nms_thresh,
                                                          box_detections_per_img=max_boxes_per_image))

                state_dict = torch.load(checkpoint_path, map_location="cpu")

                new_state_dict = {"detector." + k: v for k, v in state_dict.items()}

                maskrcnn.load_state_dict(new_state_dict)
                self.detector = maskrcnn.detector
            else:
                self.detector = MaskRCNN.load_from_checkpoint(
                    checkpoint_path,
                    inference_params=dict(box_score_thresh=box_score_thresh,
                                          box_nms_thresh=box_nms_thresh,
                                          box_detections_per_img=max_boxes_per_image)
                )
                # access to the actual MaskRCNN reference
                self.detector = self.detector.detector

        # Freeze all weights.
        for parameter in self.detector.parameters():
            parameter.requires_grad = False
        self.detector.eval()

    def forward(
            self,
            images: torch.FloatTensor
    ) -> RegionDetectorOutput:
        """
        Extract regions and region features from the given images.

        In most cases `image_features` should come directly from the `ResnetBackbone`
        `GridEmbedder`. The `images` themselves should be standardized and resized
        using the default settings for the `TorchImageLoader`.
        """
        if self.detector.training:
            raise RuntimeError(
                "MaskRcnnRegionDetector can not be used for training at the moment"
            )

        original_image_sizes = torch.jit.annotate(List[Tuple[int, int]], [])
        for img in images:
            val = img.shape[-2:]
            assert len(val) == 2
            original_image_sizes.append((val[0], val[1]))

        images, targets = self.detector.transform(images)

        image_features = self.detector.backbone(images.tensors)
        if isinstance(image_features, torch.Tensor):
            image_features = collections.OrderedDict([('0', image_features)])

        # # `proposals` is a list of tensors, one tensor per image, each representing a
        # # fixed number of proposed regions/boxes.
        # # shape (proposals[i]): (proposals_per_image, 4)
        proposals: List[Tensor]
        proposals, _ = self.detector.rpn(images, image_features)
        #
        # outputs = self.detector.roi_heads(image_features, proposals, image_shapes)
        # # shape: (batch_size * proposals_per_image, *)
        box_features = self.detector.roi_heads.box_roi_pool(image_features, proposals, images.image_sizes)
        #
        # # shape: (batch_size * proposals_per_image, *)
        box_features = self.detector.roi_heads.box_head(box_features)
        #
        # # shape (class_logits): (batch_size * proposals_per_image, num_classes)
        # # shape (box_regression): (batch_size * proposals_per_image, regression_output_size)
        class_logits, box_regression = self.detector.roi_heads.box_predictor(box_features)

        # This step filters down the `proposals` to only detections that reach
        # a certain threshold.
        # Each of these is a list of tensors, one for each image in the batch.
        # shape (boxes[i]): (num_predicted_boxes, 4)
        # shape (features[i]): (num_predicted_boxes, feature_size)
        # shape (scores[i]): (num_predicted_classes,)
        # shape (labels[i]): (num_predicted_classes,)
        boxes, box_features, scores, labels = self._postprocess_detections(
            class_logits, box_features, box_regression, proposals, images.image_sizes
        )

        num_images = len(boxes)
        result = torch.jit.annotate(List[Dict[str, torch.Tensor]], [])
        for i in range(num_images):
            result.append(
                {
                    "features": box_features[i],
                    "boxes": boxes[i],
                    "labels": labels[i],
                    "scores": scores[i],
                }
            )

        # compute masks as well
        mask_proposals = boxes
        mask_features = self.detector.roi_heads.mask_roi_pool(image_features, mask_proposals, images.image_sizes)
        mask_features = self.detector.roi_heads.mask_head(mask_features)
        mask_logits = self.detector.roi_heads.mask_predictor(mask_features)

        labels = [r["labels"] for r in result]
        masks_probs = maskrcnn_inference(mask_logits, labels)
        for mask_prob, r in zip(masks_probs, result):
            r["masks"] = mask_prob

        detections = self.detector.transform.postprocess(result, images.image_sizes, original_image_sizes)

        return detections

    def _postprocess_detections(
            self,
            class_logits: Tensor,
            box_features: Tensor,
            box_regression: Tensor,
            proposals: List[Tensor],
            image_shapes: List[Tuple[int, int]],
    ) -> Tuple[List[Tensor], List[Tensor], List[Tensor], List[Tensor]]:
        """
        Adapted from https://github.com/pytorch/vision/blob/
        4521f6d152875974e317fa247a633e9ad1ea05c8/torchvision/models/detection/roi_heads.py#L664.

        The only reason we have to re-implement this method is so we can pull out the box
        features that we want.
        """
        device = class_logits.device
        num_classes = class_logits.shape[-1]

        boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals]

        # shape: (batch_size * boxes_per_image, num_classes, 4)
        pred_boxes = self.detector.roi_heads.box_coder.decode(box_regression, proposals)

        pred_scores = F.softmax(class_logits, -1)

        pred_boxes_list = pred_boxes.split(boxes_per_image, 0)
        features_list = box_features.split(boxes_per_image, dim=0)
        pred_scores_list = pred_scores.split(boxes_per_image, 0)

        all_boxes = []
        all_features = []
        all_scores = []
        all_labels = []
        for boxes, features, scores, image_shape in zip(
                pred_boxes_list, features_list, pred_scores_list, image_shapes
        ):
            # shape: (boxes_per_image, num_classes, 4)
            boxes = box_ops.clip_boxes_to_image(boxes, image_shape)

            # shape: (boxes_per_image, num_classes, feature_size)
            features = features.unsqueeze(1).expand(boxes.shape[0], boxes.shape[1], -1)

            # create labels for each prediction
            # shape: (num_classes,)
            labels = torch.arange(num_classes, device=device)
            # shape: (boxes_per_image, num_classes,)
            labels = labels.view(1, -1).expand_as(scores)

            # remove predictions with the background label
            # shape: (boxes_per_image, num_classes - 1, 4)
            boxes = boxes[:, 1:]
            # shape: (boxes_per_image, num_classes, feature_size)
            features = features[:, 1:]
            # shape: (boxes_per_image, num_classes - 1,)
            scores = scores[:, 1:]
            # shape: (boxes_per_image, num_classes - 1,)
            labels = labels[:, 1:]

            # batch everything, by making every class prediction be a separate instance
            # shape: (boxes_per_image * (num_classes - 1), 4)
            boxes = boxes.reshape(-1, 4)
            # shape: (boxes_per_image * (num_classes - 1), feature_size)
            features = features.reshape(boxes.shape[0], -1)
            # shape: (boxes_per_image * (num_classes - 1),)
            scores = scores.reshape(-1)
            # shape: (boxes_per_image * (num_classes - 1),)
            labels = labels.reshape(-1)

            # remove low scoring boxes
            inds = torch.where(scores > self.detector.roi_heads.score_thresh)[0]
            boxes, features, scores, labels = (
                boxes[inds],
                features[inds],
                scores[inds],
                labels[inds],
            )

            # remove empty boxes
            keep = box_ops.remove_small_boxes(boxes, min_size=1e-2)
            boxes, features, scores, labels = (
                boxes[keep],
                features[keep],
                scores[keep],
                labels[keep],
            )

            # non-maximum suppression, independently done per class
            keep = box_ops.batched_nms(boxes, scores, labels, self.detector.roi_heads.nms_thresh)
            # keep only topk scoring predictions
            keep = keep[: self.detector.roi_heads.detections_per_img]
            boxes, features, scores, labels = (
                boxes[keep],
                features[keep],
                scores[keep],
                labels[keep],
            )

            all_boxes.append(boxes)
            all_features.append(features)
            all_scores.append(scores)
            all_labels.append(labels)

        return all_boxes, all_features, all_scores, all_labels


class CustomImageLoader(TorchImageLoader):
    def __init__(self, *,
                 image_backend: str = None,
                 size_divisibility: int = 32,
                 **kwargs, ):
        super().__init__(image_backend=image_backend, size_divisibility=size_divisibility, **kwargs)
        self.transforms = transforms.Compose([
            transforms.ToTensor()
        ])

    def load(self, image):
        return self.transforms(image)

    def __call__(self, image_or_images: Union[Image.Image, Iterable[Image.Image]], pack=False):
        if not isinstance(image_or_images, (list, tuple)):
            image, size = self([image_or_images])
            return image[0], size[0]
            # return cast(torch.FloatTensor, image.squeeze(0)), cast(torch.IntTensor, size.squeeze(0))

        images: List[torch.FloatTensor] = []
        sizes: List[torch.IntTensor] = []
        for image in image_or_images:
            image = self.load(image).to(self.device)
            size = cast(
                torch.IntTensor,
                torch.tensor(
                    [image.shape[-2], image.shape[-1]], dtype=torch.int32, device=self.device
                ),
            )
            images.append(image)
            sizes.append(size)

        if pack:
            return torch.stack(images), torch.stack(sizes)

        return images, sizes


def create_panorama(env, rotation_steps):
    initial_agent = env.last_event.metadata["agent"]
    curr_image = Image.fromarray(env.last_event.frame)
    panorama_frames = [curr_image]
    camera_info = [dict(
        h_view_angle=env.last_event.metadata["agent"]["rotation"]["y"],
        # flip direction of heading angle - negative will be down and positive will be up
        v_view_angle=-env.last_event.metadata["agent"]["cameraHorizon"]
    )]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



scripts/generate_maskrcnn_horizon0.py [17:382]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
from typing import cast, Iterable, Union, List, Tuple, NamedTuple, Optional, Dict

import numpy as np
import torch
from PIL import Image
import tqdm as tqdm
from allennlp.data import TorchImageLoader
from torch.utils.data import IterableDataset, DataLoader

from grolp.envs.thor_env import ThorEnv
import torchvision.ops.boxes as box_ops
import torch.nn.functional as F

TRAJ_DATA_JSON_FILENAME = "traj_data.json"

render_settings = dict()
render_settings['renderImage'] = True
render_settings['renderDepthImage'] = False
render_settings['renderObjectImage'] = False
render_settings['renderClassImage'] = False


class MaskDetectorOutput(NamedTuple):
    """
    The output type from the forward pass of a `RegionDetector`.
    """

    box_features: List[Tensor]
    """
    A list of tensors, each with shape `(num_boxes, feature_dim)`.
    """

    boxes: List[Tensor]

    masks: List[Tensor]

    class_probs: Optional[List[Tensor]] = None
    """
    An optional list of tensors. These tensors can have shape `(num_boxes,)` or
    `(num_boxes, *)` if probabilities for multiple classes are given.
    """

    class_labels: Optional[List[Tensor]] = None
    """
    An optional list of tensors that give the labels corresponding to the `class_probs`
    tensors. This should be non-`None` whenever `class_probs` is, and each tensor
    should have the same shape as the corresponding tensor from `class_probs`.
    """
    """
    A list of tensors containing the coordinates for each box. Each has shape `(num_boxes, 4)`.
    """


class MaskRCNNDetector(torch.nn.Module):
    """
    !!! Note
        This module does not have any trainable parameters by default.
        All pretrained weights are frozen.

    # Parameters

    box_score_thresh : `float`, optional (default = `0.05`)
        During inference, only proposal boxes / regions with a label classification score
        greater than `box_score_thresh` will be returned.

    box_nms_thresh : `float`, optional (default = `0.5`)
        During inference, non-maximum suppression (NMS) will applied to groups of boxes
        that share a common label.

        NMS iteratively removes lower scoring boxes which have an intersection-over-union (IoU)
        greater than `box_nms_thresh` with another higher scoring box.

    max_boxes_per_image : `int`, optional (default = `100`)
        During inference, at most `max_boxes_per_image` boxes will be returned. The
        number of boxes returned will vary by image and will often be lower
        than `max_boxes_per_image` depending on the values of `box_score_thresh`
        and `box_nms_thresh`.

    checkpoint: `str`, optional (default = `None`)
        If specified, we assume that we're loading a fine-tuned MaskRCNN model and not the one from Torchvision
    """

    def __init__(
            self,
            *,
            box_score_thresh: float = 0.05,
            box_nms_thresh: float = 0.5,
            max_boxes_per_image: int = 100,
            checkpoint_path: str = None,
            device="cpu"
    ):
        super().__init__()

        if checkpoint_path is None:
            self.detector = torchvision.models.detection.maskrcnn_resnet50_fpn(
                pretrained=True,
                box_score_thresh=box_score_thresh,
                box_nms_thresh=box_nms_thresh,
                box_detections_per_img=max_boxes_per_image,
            )
        else:

            if "moca" in checkpoint_path:
                maskrcnn = MaskRCNN(num_classes=119, hidden_size=256,
                                    inference_params=dict(box_score_thresh=box_score_thresh,
                                                          box_nms_thresh=box_nms_thresh,
                                                          box_detections_per_img=max_boxes_per_image))

                state_dict = torch.load(checkpoint_path, map_location="cpu")

                new_state_dict = {"detector." + k: v for k, v in state_dict.items()}

                maskrcnn.load_state_dict(new_state_dict)
                self.detector = maskrcnn.detector
            else:
                self.detector = MaskRCNN.load_from_checkpoint(
                    checkpoint_path,
                    inference_params=dict(box_score_thresh=box_score_thresh,
                                          box_nms_thresh=box_nms_thresh,
                                          box_detections_per_img=max_boxes_per_image)
                )
                # access to the actual MaskRCNN reference
                self.detector = self.detector.detector

        # Freeze all weights.
        for parameter in self.detector.parameters():
            parameter.requires_grad = False
        self.detector.eval()

    def forward(
            self,
            images: torch.FloatTensor
    ) -> RegionDetectorOutput:
        """
        Extract regions and region features from the given images.

        In most cases `image_features` should come directly from the `ResnetBackbone`
        `GridEmbedder`. The `images` themselves should be standardized and resized
        using the default settings for the `TorchImageLoader`.
        """
        if self.detector.training:
            raise RuntimeError(
                "MaskRcnnRegionDetector can not be used for training at the moment"
            )

        original_image_sizes = torch.jit.annotate(List[Tuple[int, int]], [])
        for img in images:
            val = img.shape[-2:]
            assert len(val) == 2
            original_image_sizes.append((val[0], val[1]))

        images, targets = self.detector.transform(images)

        image_features = self.detector.backbone(images.tensors)
        if isinstance(image_features, torch.Tensor):
            image_features = collections.OrderedDict([('0', image_features)])

        # # `proposals` is a list of tensors, one tensor per image, each representing a
        # # fixed number of proposed regions/boxes.
        # # shape (proposals[i]): (proposals_per_image, 4)
        proposals: List[Tensor]
        proposals, _ = self.detector.rpn(images, image_features)
        #
        # outputs = self.detector.roi_heads(image_features, proposals, image_shapes)
        # # shape: (batch_size * proposals_per_image, *)
        box_features = self.detector.roi_heads.box_roi_pool(image_features, proposals, images.image_sizes)
        #
        # # shape: (batch_size * proposals_per_image, *)
        box_features = self.detector.roi_heads.box_head(box_features)
        #
        # # shape (class_logits): (batch_size * proposals_per_image, num_classes)
        # # shape (box_regression): (batch_size * proposals_per_image, regression_output_size)
        class_logits, box_regression = self.detector.roi_heads.box_predictor(box_features)

        # This step filters down the `proposals` to only detections that reach
        # a certain threshold.
        # Each of these is a list of tensors, one for each image in the batch.
        # shape (boxes[i]): (num_predicted_boxes, 4)
        # shape (features[i]): (num_predicted_boxes, feature_size)
        # shape (scores[i]): (num_predicted_classes,)
        # shape (labels[i]): (num_predicted_classes,)
        boxes, box_features, scores, labels = self._postprocess_detections(
            class_logits, box_features, box_regression, proposals, images.image_sizes
        )

        num_images = len(boxes)
        result = torch.jit.annotate(List[Dict[str, torch.Tensor]], [])
        for i in range(num_images):
            result.append(
                {
                    "features": box_features[i],
                    "boxes": boxes[i],
                    "labels": labels[i],
                    "scores": scores[i],
                }
            )

        # compute masks as well
        mask_proposals = boxes
        mask_features = self.detector.roi_heads.mask_roi_pool(image_features, mask_proposals, images.image_sizes)
        mask_features = self.detector.roi_heads.mask_head(mask_features)
        mask_logits = self.detector.roi_heads.mask_predictor(mask_features)

        labels = [r["labels"] for r in result]
        masks_probs = maskrcnn_inference(mask_logits, labels)
        for mask_prob, r in zip(masks_probs, result):
            r["masks"] = mask_prob

        detections = self.detector.transform.postprocess(result, images.image_sizes, original_image_sizes)

        return detections

    def _postprocess_detections(
            self,
            class_logits: Tensor,
            box_features: Tensor,
            box_regression: Tensor,
            proposals: List[Tensor],
            image_shapes: List[Tuple[int, int]],
    ) -> Tuple[List[Tensor], List[Tensor], List[Tensor], List[Tensor]]:
        """
        Adapted from https://github.com/pytorch/vision/blob/
        4521f6d152875974e317fa247a633e9ad1ea05c8/torchvision/models/detection/roi_heads.py#L664.

        The only reason we have to re-implement this method is so we can pull out the box
        features that we want.
        """
        device = class_logits.device
        num_classes = class_logits.shape[-1]

        boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals]

        # shape: (batch_size * boxes_per_image, num_classes, 4)
        pred_boxes = self.detector.roi_heads.box_coder.decode(box_regression, proposals)

        pred_scores = F.softmax(class_logits, -1)

        pred_boxes_list = pred_boxes.split(boxes_per_image, 0)
        features_list = box_features.split(boxes_per_image, dim=0)
        pred_scores_list = pred_scores.split(boxes_per_image, 0)

        all_boxes = []
        all_features = []
        all_scores = []
        all_labels = []
        for boxes, features, scores, image_shape in zip(
                pred_boxes_list, features_list, pred_scores_list, image_shapes
        ):
            # shape: (boxes_per_image, num_classes, 4)
            boxes = box_ops.clip_boxes_to_image(boxes, image_shape)

            # shape: (boxes_per_image, num_classes, feature_size)
            features = features.unsqueeze(1).expand(boxes.shape[0], boxes.shape[1], -1)

            # create labels for each prediction
            # shape: (num_classes,)
            labels = torch.arange(num_classes, device=device)
            # shape: (boxes_per_image, num_classes,)
            labels = labels.view(1, -1).expand_as(scores)

            # remove predictions with the background label
            # shape: (boxes_per_image, num_classes - 1, 4)
            boxes = boxes[:, 1:]
            # shape: (boxes_per_image, num_classes, feature_size)
            features = features[:, 1:]
            # shape: (boxes_per_image, num_classes - 1,)
            scores = scores[:, 1:]
            # shape: (boxes_per_image, num_classes - 1,)
            labels = labels[:, 1:]

            # batch everything, by making every class prediction be a separate instance
            # shape: (boxes_per_image * (num_classes - 1), 4)
            boxes = boxes.reshape(-1, 4)
            # shape: (boxes_per_image * (num_classes - 1), feature_size)
            features = features.reshape(boxes.shape[0], -1)
            # shape: (boxes_per_image * (num_classes - 1),)
            scores = scores.reshape(-1)
            # shape: (boxes_per_image * (num_classes - 1),)
            labels = labels.reshape(-1)

            # remove low scoring boxes
            inds = torch.where(scores > self.detector.roi_heads.score_thresh)[0]
            boxes, features, scores, labels = (
                boxes[inds],
                features[inds],
                scores[inds],
                labels[inds],
            )

            # remove empty boxes
            keep = box_ops.remove_small_boxes(boxes, min_size=1e-2)
            boxes, features, scores, labels = (
                boxes[keep],
                features[keep],
                scores[keep],
                labels[keep],
            )

            # non-maximum suppression, independently done per class
            keep = box_ops.batched_nms(boxes, scores, labels, self.detector.roi_heads.nms_thresh)
            # keep only topk scoring predictions
            keep = keep[: self.detector.roi_heads.detections_per_img]
            boxes, features, scores, labels = (
                boxes[keep],
                features[keep],
                scores[keep],
                labels[keep],
            )

            all_boxes.append(boxes)
            all_features.append(features)
            all_scores.append(scores)
            all_labels.append(labels)

        return all_boxes, all_features, all_scores, all_labels


class CustomImageLoader(TorchImageLoader):
    def __init__(self, *,
                 image_backend: str = None,
                 size_divisibility: int = 32,
                 **kwargs, ):
        super().__init__(image_backend=image_backend, size_divisibility=size_divisibility, **kwargs)
        self.transforms = transforms.Compose([
            transforms.ToTensor()
        ])

    def load(self, image):
        return self.transforms(image)

    def __call__(self, image_or_images: Union[Image.Image, Iterable[Image.Image]], pack=False):
        if not isinstance(image_or_images, (list, tuple)):
            image, size = self([image_or_images])
            return image[0], size[0]
            # return cast(torch.FloatTensor, image.squeeze(0)), cast(torch.IntTensor, size.squeeze(0))

        images: List[torch.FloatTensor] = []
        sizes: List[torch.IntTensor] = []
        for image in image_or_images:
            image = self.load(image).to(self.device)
            size = cast(
                torch.IntTensor,
                torch.tensor(
                    [image.shape[-2], image.shape[-1]], dtype=torch.int32, device=self.device
                ),
            )
            images.append(image)
            sizes.append(size)

        if pack:
            return torch.stack(images), torch.stack(sizes)

        return images, sizes


def create_panorama(env, rotation_steps):
    # This is the front view of the agent
    initial_agent = env.last_event.metadata["agent"]
    curr_image = Image.fromarray(env.last_event.frame)
    panorama_frames = [curr_image]
    camera_info = [dict(
        h_view_angle=env.last_event.metadata["agent"]["rotation"]["y"],
        # flip direction of heading angle - negative will be down and positive will be up
        v_view_angle=-env.last_event.metadata["agent"]["cameraHorizon"]
    )]
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



