in agora/contoso_motors/src/webapp-decode/pose_decoder.py [0:0]
def _match_by_tag(self, inp):
tag_k, loc_k, val_k = inp
embd_size = tag_k.shape[2]
all_joints = np.concatenate((loc_k, val_k[..., None], tag_k), -1)
poses = []
for idx in self.joint_order:
tags = tag_k[idx]
joints = all_joints[idx]
mask = joints[:, 2] > self.detection_threshold
tags = tags[mask]
joints = joints[mask]
if len(poses) == 0:
for tag, joint in zip(tags, joints):
pose = Pose(self.num_joints, embd_size)
pose.add(idx, joint, tag)
poses.append(pose)
continue
if joints.shape[0] == 0 or (self.ignore_too_much and len(poses) == self.max_num_people):
continue
poses_tags = np.stack([p.tag for p in poses], axis=0)
diff = tags[:, None] - poses_tags[None, :]
diff_normed = np.linalg.norm(diff, ord=2, axis=2)
diff_saved = np.copy(diff_normed)
if self.dist_reweight:
# Reweight cost matrix to prefer nearby points among all that are close enough in a tag space.
centers = np.stack([p.center for p in poses], axis=0)[None]
dists = np.linalg.norm(joints[:, :2][:, None, :] - centers, ord=2, axis=2)
close_tags_masks = diff_normed < self.tag_threshold
min_dists = np.min(dists, axis=0, keepdims=True)
dists /= min_dists + 1e-10
diff_normed[close_tags_masks] *= dists[close_tags_masks]
if self.use_detection_val:
diff_normed = np.round(diff_normed) * 100 - joints[:, 2:3]
num_added = diff.shape[0]
num_grouped = diff.shape[1]
if num_added > num_grouped:
diff_normed = np.pad(diff_normed, ((0, 0), (0, num_added - num_grouped)),
mode='constant', constant_values=1e10)
pairs = self._max_match(diff_normed)
for row, col in pairs:
if row < num_added and col < num_grouped and diff_saved[row][col] < self.tag_threshold:
poses[col].add(idx, joints[row], tags[row])
else:
pose = Pose(self.num_joints, embd_size)
pose.add(idx, joints[row], tags[row])
poses.append(pose)
ans = np.asarray([p.pose for p in poses], dtype=np.float32).reshape(-1, self.num_joints, 2 + 1 + embd_size)
tags = np.asarray([p.tag for p in poses], dtype=np.float32).reshape(-1, embd_size)
return ans, tags