evaluation/tiny_benchmark/MyPackage/tools/pub/split_and_merge_image.py (379 lines of code) (raw):
import numpy as np
from copy import deepcopy
import json
import os
import cv2
from PIL import Image
from tqdm import tqdm
class SplitImage(object):
def __init__(self, use_fix_size=False, use_least_pieces_size=False):
self.use_fix_size = use_fix_size
self.use_least_pieces_size = use_least_pieces_size
def __get_corners(self, image_size, piece, stride, overlap):
def left_move(image_size, corners):
iw, ih = image_size
new_corners = []
for sx, sy, ex, ey in corners.reshape(-1, 4):
if ex > iw:
sx -= (ex - iw)
ex = iw
if ey > ih:
sy -= (ey - ih)
ey = ih
sx, sy = max(0, sx), max(0, sy)
new_corners.append([sx, sy, ex, ey])
return np.array(new_corners).reshape(corners.shape)
corners = []
for x in range(piece[0]):
col_corners = []
for y in range(piece[1]):
sx, sy = np.array([x, y]) * stride
ex, ey = (np.array([x, y]) + 1) * stride + overlap
col_corners.append([sx, sy, ex, ey])
corners.append(col_corners)
corners = np.array(corners)
if self.use_fix_size:
corners = left_move(image_size, corners)
else:
corners[:, :, [0, 2]] = np.clip(corners[:, :, [0, 2]], 0, image_size[0])
corners[:, :, [1, 3]] = np.clip(corners[:, :, [1, 3]], 0, image_size[1])
return corners.transpose((1, 0, 2))
def __get_sub_image_corners_by_pieces(self, image_size, pieces=(1, 1), overlap=(0, 0)):
"""
:param image_size: tuple, (w, h) of origin image .
:param pieces: tuple, (pw, ph), how many pieces to split image.
:param overlap: tuple, (ow, oh), overlap of two sub neighbour image.
:return: np.ndarray, (x1, y1, x2, y2), shape=(ph, pw, 4), corner of sub images.
"""
size, pieces, overlap = np.array(image_size), np.array(list(pieces)), np.array(list(overlap))
stride = np.ceil((size - overlap) / pieces).astype(np.int64)
return self.__get_corners(image_size, pieces, stride, overlap)
def __get_sub_image_corners_by_size(self, image_size, sub_image_size, overlap=(0, 0)):
"""
:param image_size: tuple, (w, h) of origin image .
:param sub_image_size: tuple, (sw, sh), size of sub image.
:param overlap: tuple, (ow, oh), overlap of two sub neighbour image.
:return: np.ndarray, (x1, y1, x2, y2), shape=(ph, pw, 4), corner of sub images.
"""
def least_pieces_size(image_size, size):
c1 = np.ceil(image_size[0] / size[0]) * np.ceil(image_size[1] / size[1])
c2 = np.ceil(image_size[0] / size[1]) * np.ceil(image_size[1] / size[0])
if c1 <= c2:
return size
else:
return size[1], size[0]
if self.use_least_pieces_size:
sub_image_size = least_pieces_size(image_size, sub_image_size)
size, sub_image_size, overlap = np.array(image_size), np.array(list(sub_image_size)), np.array(list(overlap))
stride = np.ceil(sub_image_size - overlap).astype(np.int64)
piece = np.ceil((size - overlap) / (sub_image_size - overlap)).astype(int)
return self.__get_corners(image_size, piece, stride, overlap)
def get_sub_image_corners(self, image_size, pieces=None, sub_image_size=None, overlap=(0, 0)):
"""
cut function for image
:param image_size: tuple, (w, h) of origin image .
:param pieces: tuple, (pw, ph), how many pieces to split image.
:param sub_image_size: tuple, (sw, sh), size of sub image.
:param overlap: tuple, (ow, oh), overlap of two sub neighbour image.
:return: np.ndarray, (x1, y1, x2, y2), shape=(ph, pw, 4), corner of sub images.
"""
assert (pieces is None) ^ (sub_image_size is None),\
'"pieces" and "sub_image_size" can only specified one of them. but got {} and {}'.format(
pieces, sub_image_size)
if sub_image_size is not None:
return self.__get_sub_image_corners_by_size(image_size, sub_image_size, overlap)
elif pieces is not None:
return self.__get_sub_image_corners_by_pieces(image_size, pieces, overlap)
def get_sub_image_boxes(self, corner:np.ndarray, annos:np.ndarray, annos_area:np.ndarray, keep_overlap=0.7):
"""
cut function for annotation
:param corner: sub image's corner
:param annos: annotations in origin image
:param annos_area: annotations's area in origin image
:param keep_overlap: IOU(annotation in sub image, annotation in origin image) > keep_overlap will keep, or ignore.
:return:
annos: annos in sub_image
annos_id[keep]: correspounding annos id in origin image
"""
annos = annos.copy()
annos_id = np.array(range(len(annos)))
annos[:, [0, 2]] = np.clip(annos[:, [0, 2]], corner[0], corner[2])
annos[:, [1, 3]] = np.clip(annos[:, [1, 3]], corner[1], corner[3])
new_anno_area = np.sqrt((annos[:, 2] - annos[:, 0]) * (annos[:, 3] - annos[:, 1]))
keep = (new_anno_area / annos_area) >= keep_overlap
annos = annos[keep]
annos = (annos.reshape((-1, 2)) - corner[:2]).reshape((-1, 4))
return annos, annos_id[keep]
def cut_image(self, image:np.ndarray, annos=None, pieces=None, sub_image_size=None, overlap=(0, 0),
anno_keep_overlap=0.7):
h, w, c = image.shape
sub_images = []
corners = self.get_sub_image_corners((w, h), pieces, sub_image_size, overlap)
corners = corners.reshape((-1, 4))
for (x1, y1, x2, y2) in corners:
sub_images.append(image[y1:y2, x1:x2].copy())
if annos is not None:
annos_area = np.sqrt((annos[:, 2] - annos[:, 0]) * (annos[:, 3] - annos[:, 1]))
sub_annos, sub_keeps = [], []
for corner in corners:
sub_anno, keep = self.get_sub_image_boxes(corner, annos, annos_area, anno_keep_overlap)
sub_annos.append(sub_anno)
sub_keeps.append(keep)
return sub_images, corners, sub_annos, sub_keeps
return sub_images, corners, None, None
def get_sub_image_corner_and_boxes(self, image_size, bboxes:np.ndarray, pieces=None,
sub_image_size=None, overlap=(0, 0), anno_keep_overlap=0.7):
corners = self.get_sub_image_corners(image_size, pieces, sub_image_size, overlap)
corners = corners.reshape((-1, 4))
if len(bboxes) > 0:
annos_area = np.sqrt((bboxes[:, 2] - bboxes[:, 0]) * (bboxes[:, 3] - bboxes[:, 1]))
sub_annos, sub_keeps = [], []
for corner in corners:
sub_anno, keep = self.get_sub_image_boxes(corner, bboxes, annos_area, anno_keep_overlap)
sub_annos.append(sub_anno)
sub_keeps.append(keep)
else:
sub_annos = [np.array([]).reshape((0, 4)) for _ in corners]
sub_keeps = [np.array([]).reshape((0, )) for _ in corners]
return corners, sub_annos, sub_keeps
class MergeResult(object):
def __init__(self, use_nms=True, nms_th=0.5):
self.use_nms = use_nms
self.nms_th = nms_th
def merge_result(self, corners, results, scores=None):
merge_result = self.translate_bboxes(corners, results)
no_empty_result = sum([len(result) > 0 for result in results])
if no_empty_result > 1 and self.use_nms: # only when no_empty sub result > 1, need nms to merge in overlap area
merge_result, keep = self.nms(merge_result, scores)
return merge_result
def merge_maskrcnn_benchmark_result(self, corners, results, im_scales=None, image_size=None):
import torch
from maskrcnn_benchmark.structures.boxlist_ops import BoxList
def result_fmt(result):
bbox = result.bbox
labels = result.extra_fields["labels"].reshape(-1, 1).float()
scores = result.extra_fields["scores"].reshape(-1, 1)
det_result = torch.cat([bbox, labels, scores], dim=1).detach().cpu().numpy()
return det_result
input_BoxList = isinstance(results[0], BoxList)
if input_BoxList:
assert im_scales is not None and image_size is not None, ''
if input_BoxList:
det_results = []
for result, im_scale in zip(results, im_scales):
det_result = result_fmt(result)
det_result[:, :4] = det_result[:, :4] / np.array([im_scale[1], im_scale[0], im_scale[1], im_scale[0]])
det_results.append(det_result)
else:
det_results = results
det_results = self.translate_bboxes(corners, det_results)
if len(det_results) == 0: return []
_, keep = self.nms(det_results[:, :4], det_results[:, 5])
det_results = det_results[keep]
if input_BoxList:
merge_result = BoxList(torch.Tensor(det_results[:, :4]), image_size, 'xyxy')
merge_result.add_field("labels", torch.Tensor(det_results[:, 4]))
merge_result.add_field("scores", torch.Tensor(det_results[:, 5]))
else:
merge_result = det_results
return merge_result
def translate_bboxes(self, corners, results):
"""
:param corners: corner of all sub image
:param results: result of all sub image, results[i] = np.array([[x1, y1, x2, y2,...]...])
:return:
"""
merge_result = []
for corner, result in zip(corners, results):
if len(result) == 0:
continue
result = result.copy()
result[:, [0, 2]] += corner[0]
result[:, [1, 3]] += corner[1]
merge_result.extend(result.tolist())
merge_result = np.array(merge_result)
return merge_result
def nms(self, merge_result, scores):
from maskrcnn_benchmark.layers import nms as _box_nms
import torch
if scores is None:
scores = torch.ones(size=(len(merge_result),))
if not isinstance(scores, torch.Tensor):
scores = torch.Tensor(scores)
merge_result = torch.Tensor(merge_result)
keep = _box_nms(merge_result, scores, self.nms_th)
merge_result = merge_result[keep].detach().cpu().numpy()
return merge_result, keep.detach().cpu().numpy()
def xywh2xyxy(boxes):
x, y, w, h = boxes.T
x2, y2 = x + w, y + h
return np.array([x, y, x2, y2]).T
def xyxy2xywh(boxes):
x1, y1, x2, y2 = boxes.T
return np.array([x1, y1, x2-x1, y2-y1]).T
class COCOSplitImage(SplitImage):
"""
if sub_image_dir is not None: (do not need change Dataset in framework)
generate sub image and save them in sub_image_dir, name them with they left-up corner point,
xx.jpg-> xx_200_300.jpg mean sub image's left-up corner is (200, 300)
images: change file_name and so on.
[{'file_name': 'xx_200_300.jpg', 'height': sub_image_h, 'width': sub_image_w, 'id': sub_image_id, ...}..]
else: (need change Dataset in framework, add 'corner' crop)
not generate sub image.
images: add 'corner' key-value, not changed file_name
[{'corner': sub_image_corner ,'height': sub_image_h, 'width': sub_image_w, 'id': sub_image_id}..]
for both condition:
annotations: [{'id': new_anno_id, 'image_id': sub_image_id, 'area': new_area, 'size': new_size,
'bbox': new_bbox, 'segmentation': new_segmentation, ...}..]
"""
def __init__(self, pieces=None, sub_image_size=None, overlap=(0, 0), anno_keep_overlap=0.7, *args, **kwargs):
"""
:param pieces:
:param sub_image_size:
:param overlap:
:param anno_keep_overlap:
:param sub_image_dir:
"""
super(COCOSplitImage, self).__init__(*args, **kwargs)
self.pieces = pieces
self.sub_image_size = sub_image_size
self.overlap = overlap
self.anno_keep_overlap = anno_keep_overlap
def __turn_anno_to_sub_image_anno(self, bboxes, origin_annos, sub_anno_id, sub_image_id):
"""
:param bboxes: [(x1, y1, x2, y2)]
:param origin_annos:
:param start_new_id:
:param corner_id:
:return:
"""
bboxes = self.__turn_ndarray_to_list(xyxy2xywh(bboxes).astype('float32'))
new_id = sub_anno_id
annos = []
for bbox, origin_anno in zip(bboxes, origin_annos):
anno = deepcopy(origin_anno)
x1, y1, w, h = bbox
x2, y2 = x1 + w, y1 + h
anno['bbox'] = bbox
anno['segmentation'] = [[x1, y1, x1, y2, x2, y2, x2, y1]]
anno['area'] = w * h
anno['size'] = np.sqrt(anno['area'])
anno['id'] = new_id
anno['image_id'] = sub_image_id
new_id += 1
annos.append(anno)
return annos
def __turn_image_info_to_sub_image_infos(self, corner, image_info, sub_image_id, generate_sub_image:bool):
corner = self.__turn_ndarray_to_list(corner.astype('int32'))
new_image_info = deepcopy(image_info)
new_image_info['id'] = sub_image_id
new_image_info['width'] = corner[2] - corner[0]
new_image_info['height'] = corner[3] - corner[1]
if generate_sub_image:
f, ext = os.path.splitext(new_image_info['file_name'])
new_image_info['file_name'] = '{}_{}_{}{}'.format(f, corner[0], corner[1], ext)
else:
new_image_info['corner'] = self.__turn_ndarray_to_list(deepcopy(corner))
return new_image_info
def __turn_ndarray_to_list(self, v):
if isinstance(v, np.ndarray):
return v.tolist()
elif isinstance(v, (tuple, list)):
return [self.__turn_ndarray_to_list(e) for e in v]
return v
def get_sub_image_corners_and_annos(self, image_info, annos, sub_anno_id, sub_image_id, generate_sub_image):
if len(annos) == 0:
boxes = np.array([]).reshape(0, 4)
else:
boxes = np.array([anno['bbox'] for anno in annos])
boxes = xywh2xyxy(boxes)
image_size = (image_info['width'], image_info['height'])
corners, sub_annos, sub_keeps = super(COCOSplitImage, self)\
.get_sub_image_corner_and_boxes(image_size, boxes, self.pieces, self.sub_image_size,
self.overlap, self.anno_keep_overlap)
sub_image_infos = []
for i, (corner, sub_anno, sub_keep) in enumerate(zip(corners, sub_annos, sub_keeps)):
sub_image_info = self.__turn_image_info_to_sub_image_infos(corner, image_info, sub_image_id, generate_sub_image)
sub_image_infos.append(sub_image_info)
if len(sub_anno) > 0:
orgin_anno = [annos[keep_id] for keep_id in sub_keep]
sub_annos[i] = self.__turn_anno_to_sub_image_anno(sub_anno,
orgin_anno, sub_anno_id, sub_image_id)
else:
sub_annos[i] = []
sub_image_id += 1
sub_anno_id += len(sub_anno)
return corners, sub_annos, sub_image_infos, sub_anno_id, sub_image_id
def __load_coco_data_map(self, json_data):
image_infos, annotations = json_data['images'], json_data['annotations']
image_ids = [image_info['id'] for image_info in image_infos]
image_id_to_annos_id = {image_id: [] for image_id in image_ids}
image_id_to_image_info = {image_info['id']: image_info for image_info in image_infos}
anno_id_to_anno = {anno['id']: anno for anno in annotations}
for anno in json_data['annotations']:
image_id_to_annos_id[anno['image_id']].append(anno['id']) # an image_id in anno must contained in 'images'
return image_ids, image_id_to_image_info, anno_id_to_anno, image_id_to_annos_id
def __save_annotations(self, json_data, src_annotation_path, dst_annotation_path):
if dst_annotation_path is not None:
if os.path.isdir(dst_annotation_path):
f_dir, f_name = os.path.split(src_annotation_path)
f, ext = os.path.splitext(f_name)
if self.pieces is not None:
pw, ph = self.pieces
save_pth = os.path.join(dst_annotation_path, '{}_pw{}_ph{}{}'.format(f, pw, ph, ext))
else:
sw, sh = self.sub_image_size
save_pth = os.path.join(dst_annotation_path, '{}_sw{}_sh{}{}'.format(f, sw, sh, ext))
else:
save_pth = dst_annotation_path
print(save_pth, os.path.abspath(save_pth))
json.dump(json_data, open(save_pth, 'w'))
def cut_image_for_coco_json_dataset(self, src_annotation_path, dst_annotation_path=None,
src_image_dir=None, dst_image_dir=None):
assert dst_image_dir is None or src_image_dir is not None, \
'sub_image_dir specified, will save sub images to disk, but src_image_dir not given,' \
' it needed to load origin image.'
generate_sub_image = dst_image_dir is not None
# get json file info
json_data = json.load(open(src_annotation_path))
image_ids, image_id_to_image_info, anno_id_to_anno, image_id_to_annos_id = self.__load_coco_data_map(json_data)
#
new_annotations, new_image_infos = [], []
sub_anno_id, sub_image_id = 0, 0
for i, image_id in enumerate(image_ids):
# get image_info and annotations for an image
annos_id = image_id_to_annos_id[image_id]
image_info = image_id_to_image_info[image_id]
annos = [anno_id_to_anno[anno_id] for anno_id in annos_id]
# get sub images' corner, annotations, image_info
corners, sub_annos, sub_image_infos, sub_anno_id, sub_image_id = self.get_sub_image_corners_and_annos(
image_info, annos, sub_anno_id, sub_image_id, generate_sub_image)
# merge sub image infos and annotations
for sub_anno in sub_annos:
if len(sub_anno) > 0:
new_annotations.extend(sub_anno)
new_image_infos.extend(sub_image_infos)
# save sub image to disk.
if dst_image_dir is not None:
image = Image.open(image_info['file_name'])
for corner, sub_image_info in zip(corners, sub_image_infos):
sub_image = image.crop(corner)
sub_image.save(os.path.join(dst_image_dir, sub_image_info['file_name']))
# test_cut_image_for_coco_json_dataset(image_info, annos, sub_image_infos, sub_annos, self, corners)
# if i > 10: break
json_data['annotations'] = new_annotations
json_data['old_images'] = json_data['images']
json_data['images'] = new_image_infos
self.__save_annotations(json_data, src_annotation_path, dst_annotation_path)
return json_data
class COCOMergeResult(MergeResult):
def __turn_det_result(self, bbox, image_id, old_det_result):
det_result = deepcopy(old_det_result)
det_result['bbox'] = xyxy2xywh(np.array(bbox)).tolist()
det_result['image_id'] = image_id
return det_result
def __load_coco_data_map(self, corner_gts):
merge_image_ids = [image_info['id'] for image_info in corner_gts['old_images']]
image_id_to_image_info = {image_info['id']: image_info for image_info in corner_gts['images']}
filename_to_merge_image_id = {image_info['file_name']: image_info['id'] for image_info in
corner_gts['old_images']}
merge_image_id_to_image_ids = {}
for image_info in corner_gts['images']:
merge_image_id = filename_to_merge_image_id[image_info['file_name']]
if merge_image_id not in merge_image_id_to_image_ids:
merge_image_id_to_image_ids[merge_image_id] = [image_info['id']]
else:
merge_image_id_to_image_ids[merge_image_id].append(image_info['id'])
assert len(merge_image_ids) == len(merge_image_id_to_image_ids)
return merge_image_ids, image_id_to_image_info, merge_image_id_to_image_ids
def __load_det_data_map(self, det_data):
image_id_to_det_boxes = {}
for det_bbox in det_data:
if det_bbox['image_id'] not in image_id_to_det_boxes:
image_id_to_det_boxes[det_bbox['image_id']] = [det_bbox]
else:
image_id_to_det_boxes[det_bbox['image_id']].append(det_bbox)
return image_id_to_det_boxes
def __save_file(self, json_data, src_path, dst_path):
if os.path.isdir(dst_path):
f_dir, f_name = os.path.split(src_path)
f, ext = os.path.splitext(f_name)
save_pth = os.path.join(dst_path,
'{}_merge_nms{}{}'.format(f, self.nms_th if self.use_nms else 'None', ext))
else:
save_pth = dst_path
json.dump(json_data, open(save_pth, 'w'))
print('[COCOMergeResult]: save file to', os.path.abspath(save_pth))
return save_pth
def __call__(self, corner_gt_file_path, src_det_file_path, dst_det_file_path=None):
det_data = json.load(open(src_det_file_path))
corner_gts = json.load(
open(corner_gt_file_path))
# create data map
merge_image_ids, image_id_to_image_info, merge_image_id_to_image_ids = self.__load_coco_data_map(corner_gts)
image_id_to_det_boxes = {image_id: [] for image_id in image_id_to_image_info}
image_id_to_det_boxes.update(self.__load_det_data_map(det_data))
merge_image_id_to_det_results = {id: [] for id in merge_image_ids}
merge_image_id_to_corners = {id: [] for id in merge_image_ids}
for merge_image_id, image_ids in merge_image_id_to_image_ids.items():
for image_id in image_ids:
merge_image_id_to_det_results[merge_image_id].append(image_id_to_det_boxes[image_id])
merge_image_id_to_corners[merge_image_id].append(image_id_to_image_info[image_id]['corner'])
# merge all det result
all_merge_det_results = []
for merge_image_id in tqdm(merge_image_id_to_det_results):
corners = merge_image_id_to_corners[merge_image_id]
det_results_list = merge_image_id_to_det_results[merge_image_id]
# merge all boxes to origin image
old_det_results, det_bboxes = [], []
for det_results in det_results_list:
old_det_results.extend(det_results)
det_bboxes.append(np.array([xywh2xyxy(np.array(det_result['bbox']))
for det_result in det_results]))
merge_boxes = self.translate_bboxes(corners, det_bboxes)
# nms
if self.use_nms:
scores = [det_result['score'] for det_result in old_det_results]
merge_boxes, keeps = self.nms(merge_boxes, scores)
old_det_results = [old_det_results[keep] for keep in keeps]
# turn bbox to det_result
merge_results = []
for bbox, old_det_result in zip(merge_boxes, old_det_results):
det_result = self.__turn_det_result(bbox, merge_image_id, old_det_result)
merge_results.append(det_result)
all_merge_det_results.extend(merge_results)
save_pth = None
if dst_det_file_path is not None:
save_pth = self.__save_file(all_merge_det_results, src_det_file_path, dst_det_file_path)
return all_merge_det_results, save_pth
if __name__ == '__main__':
merger = COCOMergeResult(use_nms=True)
# merger(
# '/home/hui/dataset/sanya/cocostyle_release/all/rgb/corner_annotations/tiny_all_rgb_test_coco_pw10_ph5.json',
# '../../outputs/tiny/scale/coco_pretrain_base4d2_1x/inference/tiny_all_rgb_corner_pw10_ph5_test_coco/bbox.json',
# '../../outputs/tiny/scale/coco_pretrain_base4d2_1x/inference/tiny_all_rgb_corner_pw10_ph5_test_coco/'
# )
merger(
'/home/hui/dataset/sanya/cocostyle_release/all/rgb/corner_annotations/tiny_all_rgb_test_coco_pw4_ph2.json',
'/home/hui/github/TinyObject/pipeline/maskrcnn-benchmark/outputs/tiny/FPN_all_rgb_cut/coco_pretrain_more_DA2/inference/tiny_all_rgb_cut_test_coco/bbox.json',
'/home/hui/github/TinyObject/pipeline/maskrcnn-benchmark/outputs/tiny/FPN_all_rgb_cut/coco_pretrain_more_DA2/inference/tiny_all_rgb_cut_test_coco/'
)