evaluation/tiny_benchmark/maskrcnn_benchmark/modeling/rpn/rpn.py (182 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
import torch
import torch.nn.functional as F
from torch import nn
from maskrcnn_benchmark.modeling import registry
from maskrcnn_benchmark.modeling.box_coder import BoxCoder
from maskrcnn_benchmark.modeling.rpn.retinanet.retinanet import build_retinanet
from maskrcnn_benchmark.modeling.rpn.fcos.fcos import build_fcos
from maskrcnn_benchmark.modeling.rpn.cascade_fcos.cascade_fcos import build_cascade_fcos
from maskrcnn_benchmark.modeling.rpn.locnet.locnet import build_location_net
from maskrcnn_benchmark.modeling.rpn.gaussian_net.gaussian_net import build_gaussian_net
import maskrcnn_benchmark.modeling.rpn.retinanet_fa.retinanet as retinanet_fa
from .loss import make_rpn_loss_evaluator
from .anchor_generator import make_anchor_generator
from .inference import make_rpn_postprocessor
class RPNHeadConvRegressor(nn.Module):
"""
A simple RPN Head for classification and bbox regression
"""
def __init__(self, cfg, in_channels, num_anchors):
"""
Arguments:
cfg : config
in_channels (int): number of channels of the input feature
num_anchors (int): number of anchors to be predicted
"""
super(RPNHeadConvRegressor, self).__init__()
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)
for l in [self.cls_logits, self.bbox_pred]:
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)
def forward(self, x):
assert isinstance(x, (list, tuple))
logits = [self.cls_logits(y) for y in x]
bbox_reg = [self.bbox_pred(y) for y in x]
return logits, bbox_reg
class RPNHeadFeatureSingleConv(nn.Module):
"""
Adds a simple RPN Head with one conv to extract the feature
"""
def __init__(self, cfg, in_channels):
"""
Arguments:
cfg : config
in_channels (int): number of channels of the input feature
"""
super(RPNHeadFeatureSingleConv, self).__init__()
self.conv = nn.Conv2d(
in_channels, in_channels, kernel_size=3, stride=1, padding=1
)
for l in [self.conv]:
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)
self.out_channels = in_channels
def forward(self, x):
assert isinstance(x, (list, tuple))
x = [F.relu(self.conv(z)) for z in x]
return x
@registry.RPN_HEADS.register("SingleConvRPNHead")
class RPNHead(nn.Module):
"""
Adds a simple RPN Head with classification and regression heads
"""
def __init__(self, cfg, in_channels, num_anchors):
"""
Arguments:
cfg : config
in_channels (int): number of channels of the input feature
num_anchors (int): number of anchors to be predicted
"""
super(RPNHead, self).__init__()
self.conv = nn.Conv2d(
in_channels, in_channels, kernel_size=3, stride=1, padding=1
)
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)
for l in [self.conv, self.cls_logits, self.bbox_pred]:
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)
def forward(self, x):
logits = []
bbox_reg = []
for feature in x:
t = F.relu(self.conv(feature))
logits.append(self.cls_logits(t))
bbox_reg.append(self.bbox_pred(t))
return logits, bbox_reg
@registry.RPN_HEADS.register("MultiConvRPNHead")
class MultiConvRPNHead(nn.Module):
"""
Adds a simple RPN Head with classification and regression heads
"""
def __init__(self, cfg, in_channels, num_anchors):
"""
Arguments:
cfg : config
in_channels (int): number of channels of the input feature
num_anchors (int): number of anchors to be predicted
"""
super(MultiConvRPNHead, self).__init__()
cls_tower = []
bbox_tower = []
for i in range(cfg.MODEL.RPN.NUM_CONVS):
cls_tower.append(
nn.Conv2d(
in_channels,
in_channels,
kernel_size=3,
stride=1,
padding=1
)
)
cls_tower.append(nn.ReLU())
bbox_tower.append(
nn.Conv2d(
in_channels,
in_channels,
kernel_size=3,
stride=1,
padding=1
)
)
bbox_tower.append(nn.ReLU())
self.add_module('cls_tower', nn.Sequential(*cls_tower))
self.add_module('bbox_tower', nn.Sequential(*bbox_tower))
self.cls_logits = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1)
self.bbox_pred = nn.Conv2d(
in_channels, num_anchors * 4, kernel_size=1, stride=1
)
# Initialization
for modules in [self.cls_tower, self.bbox_tower, self.cls_logits, self.bbox_pred]:
for l in modules.modules():
if isinstance(l, nn.Conv2d):
torch.nn.init.normal_(l.weight, std=0.01)
torch.nn.init.constant_(l.bias, 0)
def forward(self, x):
logits = []
bbox_reg = []
for feature in x:
logits.append(self.cls_logits(self.cls_tower(feature)))
bbox_reg.append(self.bbox_pred(self.bbox_tower(feature)))
return logits, bbox_reg
class RPNModule(torch.nn.Module):
"""
Module for RPN computation. Takes feature maps from the backbone and RPN
proposals and losses. Works for both FPN and non-FPN.
"""
def __init__(self, cfg, in_channels):
super(RPNModule, self).__init__()
self.cfg = cfg.clone()
anchor_generator = make_anchor_generator(cfg)
rpn_head = registry.RPN_HEADS[cfg.MODEL.RPN.RPN_HEAD]
head = rpn_head(
cfg, in_channels, anchor_generator.num_anchors_per_location()[0]
)
rpn_box_coder = BoxCoder(weights=(1.0, 1.0, 1.0, 1.0))
box_selector_train = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=True)
box_selector_test = make_rpn_postprocessor(cfg, rpn_box_coder, is_train=False)
loss_evaluator = make_rpn_loss_evaluator(cfg, rpn_box_coder)
self.anchor_generator = anchor_generator
self.head = head
self.box_selector_train = box_selector_train
self.box_selector_test = box_selector_test
self.loss_evaluator = loss_evaluator
def forward(self, images, features, targets=None):
"""
Arguments:
images (ImageList): images for which we want to compute the predictions
features (list[Tensor]): features computed from the images that are
used for computing the predictions. Each tensor in the list
correspond to different feature levels
targets (list[BoxList): ground-truth boxes present in the image (optional)
Returns:
boxes (list[BoxList]): the predicted boxes from the RPN, one BoxList per
image.
losses (dict[Tensor]): the losses for the model during training. During
testing, it is an empty dict.
"""
objectness, rpn_box_regression = self.head(features)
anchors = self.anchor_generator(images, features)
# show_image(images)
if self.training:
return self._forward_train(anchors, objectness, rpn_box_regression, targets)
else:
return self._forward_test(anchors, objectness, rpn_box_regression)
def _forward_train(self, anchors, objectness, rpn_box_regression, targets):
if self.cfg.MODEL.RPN_ONLY:
# When training an RPN-only model, the loss is determined by the
# predicted objectness and rpn_box_regression values and there is
# no need to transform the anchors into predicted boxes; this is an
# optimization that avoids the unnecessary transformation.
boxes = anchors
else:
# For end-to-end models, anchors must be transformed into boxes and
# sampled into a training batch.
with torch.no_grad():
boxes = self.box_selector_train(
anchors, objectness, rpn_box_regression, targets
)
loss_objectness, loss_rpn_box_reg = self.loss_evaluator(
anchors, objectness, rpn_box_regression, targets
)
losses = {
"loss_objectness": loss_objectness,
"loss_rpn_box_reg": loss_rpn_box_reg,
}
return boxes, losses
def _forward_test(self, anchors, objectness, rpn_box_regression):
boxes = self.box_selector_test(anchors, objectness, rpn_box_regression)
if self.cfg.MODEL.RPN_ONLY:
# For end-to-end models, the RPN proposals are an intermediate state
# and don't bother to sort them in decreasing score order. For RPN-only
# models, the proposals are the final output and we return them in
# high-to-low confidence order.
inds = [
box.get_field("objectness").sort(descending=True)[1] for box in boxes
]
boxes = [box[ind] for box, ind in zip(boxes, inds)]
return boxes, {}
def build_rpn(cfg, in_channels):
"""
This gives the gist of it. Not super important because it doesn't change as much
"""
# add by hui ###############################
if cfg.MODEL.LOC_ON:
return build_location_net(cfg, in_channels)
if cfg.MODEL.FCOS_ON:
if cfg.MODEL.FCOS.CASCADE_ON:
return build_cascade_fcos(cfg, in_channels)
else:
return build_fcos(cfg, in_channels)
if cfg.MODEL.GAU_ON:
return build_gaussian_net(cfg, in_channels)
############################################################
if cfg.MODEL.RETINANET_ON:
# add by hui ###############################
if cfg.FREEANCHOR.FREEANCHOR_ON:
return retinanet_fa.build_retinanet(cfg, in_channels)
############################################################
return build_retinanet(cfg, in_channels)
return RPNModule(cfg, in_channels)
# ##################### add by hui
def show_image(images
# , targets, loss_eval, infer, locations, loc_strides, logits
):
import numpy as np
import matplotlib.pyplot as plt
# cls_logits, gau_logits = logits
# eps = 1e-6
# # vis label infer result
# labels, matched_gt_idxs = loss_eval.prepare_targets(locations, targets)
# labels_flatten = []
# gau_labels = []
# for l in range(len(labels)):
# N, C, H, W = cls_logits[l].shape
# labels_flatten.append(labels[l].reshape(N, H, W, -1).permute(0, 3, 1, 2))
# gau_labels.append((labels_flatten[l] > eps).float())
# infer.is_logit = False
# boxlists = infer(loc_strides, (labels_flatten, gau_labels), images.image_sizes)
# infer.is_logit = True
# infer_label = boxlists[0].bbox.cpu().numpy().tolist()
# # assert len(targets[0].bbox) == len(infer_label), "{} vs {}".format(len(targets[0].bbox), len(infer_label))
# # if len(targets[0].bbox) != len(infer_label):
plt.figure()
img = images.tensors[0].permute((1, 2, 0)).cpu().numpy() + np.array([102.9801, 115.9465, 122.7717])
plt.imshow(img / 255)
# for (x1, y1, x2, y2) in targets[0].bbox.cpu().numpy().tolist():
# plt.axes().add_patch(
# plt.Rectangle((x1, y1), x2 - x1 + 1, y2 - y1 + 1, fill=False, color=(1, 0, 0), linewidth=2)
# )
# for (ix1, iy1, ix2, iy2) in infer_label:
# plt.axes().add_patch(
# plt.Rectangle((ix1 + 5, iy1+5,), ix2 - ix1 + 1, iy2 - iy1 + 1, fill=False, color=(0, 1, 0), linewidth=2)
# )
# plt.title("image gt:{}, infer: {}".format(len(targets[0].bbox), len(infer_label)))
plt.show()