in models/base_model.py [0:0]
def __init__(self, model_cfg: OmegaConf, num_classes: Dict[str, int],
class_mappings: Dict[Tuple[str, str], torch.FloatTensor]):
super().__init__()
# Takes as input (B, T, H, W, C) -> (B, T', H', W', C')
_backbone_full = hydra.utils.instantiate(
model_cfg.backbone,
# Add dummy value for num_cls
# will be removed next anyway
num_classes=1)
if model_cfg.backbone_last_n_modules_to_drop > 0:
self.backbone = nn.Sequential()
for name, child in list(_backbone_full.named_children(
))[:-model_cfg.backbone_last_n_modules_to_drop]:
self.backbone.add_module(name, child)
else:
self.backbone = _backbone_full
# Map the (B, T', H', W', C') -> (B, T', H', W', C*)
# to the intermediate feature dimensions
# IMP: this is only used if C' != C*
if (model_cfg.backbone_last_n_modules_to_drop == 0
and 'output_dim' in dir(self.backbone)):
backbone_dim = self.backbone.output_dim
else:
backbone_dim = model_cfg.backbone_dim # TODO: Figure automatically
self.mapper_to_inter = None
if model_cfg.intermediate_featdim is None:
model_cfg.intermediate_featdim = backbone_dim
if backbone_dim != model_cfg.intermediate_featdim:
self.mapper_to_inter = nn.Linear(backbone_dim,
model_cfg.intermediate_featdim,
bias=False)
# Takes as input (B, T', H', W', C*) -> (B, C**)
self.temporal_aggregator = hydra.utils.instantiate(
model_cfg.temporal_aggregator,
in_features=model_cfg.intermediate_featdim)
self.reset_temp_agg_feat_dim = nn.Sequential()
temp_agg_output_dim = self.temporal_aggregator.output_dim
if model_cfg.same_temp_agg_dim and (temp_agg_output_dim !=
model_cfg.intermediate_featdim):
# Ideally want to maintain it so that the same project_mlp
# can be used for the temporally aggregated features, or the
# original features.
self.reset_temp_agg_feat_dim = nn.Linear(
temp_agg_output_dim, model_cfg.intermediate_featdim)
temp_agg_output_dim = model_cfg.intermediate_featdim
# Transforms the current features to future ones
# (B, C**) -> (B, C**)
self.future_predictor = hydra.utils.instantiate(
model_cfg.future_predictor,
in_features=temp_agg_output_dim,
_recursive_=False)
# Projection layer
self.project_mlp = nn.Sequential()
if model_cfg.project_dim_for_nce is not None:
self.project_mlp = nn.Sequential(
nn.Linear(temp_agg_output_dim, temp_agg_output_dim),
nn.ReLU(inplace=True),
nn.Linear(temp_agg_output_dim, model_cfg.project_dim_for_nce))
# 2nd round of temporal aggregation, if needed
self.temporal_aggregator_after_future_pred = hydra.utils.instantiate(
model_cfg.temporal_aggregator_after_future_pred,
self.future_predictor.output_dim)
# Dropout
self.dropout = nn.Dropout(model_cfg.dropout)
# Takes as input (B, C**) -> (B, num_classes)
cls_input_dim = self.temporal_aggregator_after_future_pred.output_dim
# Make a separate classifier for each output
self.classifiers = nn.ModuleDict()
self.num_classes = num_classes
for i, (cls_type, cls_dim) in enumerate(num_classes.items()):
if model_cfg.use_cls_mappings and i > 0:
# In this case, rely on the class mappings to generate the
# other predictions, rather than creating a new linear layer
break
self.classifiers.update({
cls_type:
hydra.utils.instantiate(model_cfg.classifier,
in_features=cls_input_dim,
out_features=cls_dim)
})
# Store the class mappings as buffers
for (src, dst), mapping in class_mappings.items():
self.register_buffer(f'{CLS_MAP_PREFIX}{src}_{dst}', mapping)
self.regression_head = None
if model_cfg.add_regression_head:
self.regression_head = nn.Linear(cls_input_dim, 1)
# Init weights, as per the video resnets
self._initialize_weights()
# Set he BN momentum and eps here, Du uses a different value and its imp
self._set_bn_params(model_cfg.bn.eps, model_cfg.bn.mom)
self.cfg = model_cfg