cli/foundation-models/system/finetune/video-multi-object-tracking/cocovid2jsonl.py (127 lines of code) (raw):
import json
import os
import sys
import argparse
# Define Converters
class CocoVidToJSONLinesConverter:
def convert(self):
raise NotImplementedError
class BoundingBoxConverter(CocoVidToJSONLinesConverter):
"""example output for object tracking jsonl:
{
"image_url":"azureml://subscriptions/<my-subscription-id>/resourcegroups/<my-resource-group>/workspaces/<my-workspace>/datastores/<my-datastore>/paths/<path_to_image>",
"image_details":{
"format":"image_format",
"width":"image_width",
"height":"image_height"
},
"video_details": {
"frame_id": "zero_based_frame_id(int)",
"video_name": "video_name",
},
"label":[
{
"label":"class_name_1",
"topX":"xmin/width",
"topY":"ymin/height",
"bottomX":"xmax/width",
"bottomY":"ymax/height",
"isCrowd":"isCrowd"
"instance_id": "instance_id"
},
{
"label":"class_name_2",
"topX":"xmin/width",
"topY":"ymin/height",
"bottomX":"xmax/width",
"bottomY":"ymax/height",
"instance_id": "instance_id"
},
"..."
]
}
"""
def __init__(self, coco_data):
self.json_lines_data = []
self.categories = {}
self.coco_data = coco_data
self.image_id_to_data_index = {}
self.video_id_to_name = {}
for i in range(0, len(coco_data["images"])):
self.json_lines_data.append({})
self.json_lines_data[i]["image_url"] = ""
self.json_lines_data[i]["image_details"] = {}
self.json_lines_data[i]["video_details"] = {}
self.json_lines_data[i]["label"] = []
for i in range(0, len(coco_data["categories"])):
self.categories[coco_data["categories"][i]["id"]] = coco_data["categories"][
i
]["name"]
for i in range(0, len(coco_data["videos"])):
self.video_id_to_name[coco_data["videos"][i]["id"]] = coco_data["videos"][
i
]["name"]
def _populate_image_url(self, index, coco_image):
self.json_lines_data[index]["image_url"] = coco_image["file_name"]
self.image_id_to_data_index[coco_image["id"]] = index
def _populate_image_details(self, index, coco_image):
file_name = coco_image["file_name"]
self.json_lines_data[index]["image_details"]["format"] = file_name[
file_name.rfind(".") + 1 :
]
self.json_lines_data[index]["image_details"]["width"] = coco_image["width"]
self.json_lines_data[index]["image_details"]["height"] = coco_image["height"]
def _populate_video_details(self, index, coco_image):
self.json_lines_data[index]["video_details"]["frame_id"] = coco_image[
"frame_id"
]
self.json_lines_data[index]["video_details"][
"video_name"
] = self.video_id_to_name[coco_image["video_id"]]
def _populate_bbox_in_label(self, label, annotation, image_details):
# if bbox comes as normalized, skip normalization.
if max(annotation["bbox"]) < 1.5:
width = 1
height = 1
else:
width = image_details["width"]
height = image_details["height"]
label["topX"] = annotation["bbox"][0] / width
label["topY"] = annotation["bbox"][1] / height
label["bottomX"] = (annotation["bbox"][0] + annotation["bbox"][2]) / width
label["bottomY"] = (annotation["bbox"][1] + annotation["bbox"][3]) / height
def _populate_label(self, annotation):
index = self.image_id_to_data_index[annotation["image_id"]]
image_details = self.json_lines_data[index]["image_details"]
label = {"label": self.categories[annotation["category_id"]]}
self._populate_bbox_in_label(label, annotation, image_details)
self._populate_instanceId(label, annotation)
self._populate_isCrowd(label, annotation)
self._populate_visibility(label, annotation)
self.json_lines_data[index]["label"].append(label)
def _populate_instanceId(self, label, annotation):
label["instance_id"] = annotation["instance_id"]
def _populate_isCrowd(self, label, annotation):
if "iscrowd" in annotation.keys():
label["isCrowd"] = int(annotation["iscrowd"])
def _populate_visibility(self, label, annotation):
if "visibility" in annotation.keys():
label["visibility"] = annotation["visibility"]
def convert(self):
for i in range(0, len(self.coco_data["images"])):
self._populate_image_url(i, self.coco_data["images"][i])
self._populate_image_details(i, self.coco_data["images"][i])
self._populate_video_details(i, self.coco_data["images"][i])
if "annotations" not in self.coco_data:
self.coco_data["annotations"] = []
for i in range(0, len(self.coco_data["annotations"])):
self._populate_label(self.coco_data["annotations"][i])
return self.json_lines_data
def main(args):
input_coco_file_path = args.input_cocovid_file_path
output_dir = args.output_dir
output_file_path = output_dir + "/" + args.output_file_name
print(output_file_path)
task_type = args.task_type
base_url = args.base_url
def read_coco_file(coco_file):
with open(coco_file) as f_in:
return json.load(f_in)
def write_json_lines(converter, filename, base_url=None):
json_lines_data = converter.convert()
with open(filename, "w") as outfile:
for json_line in json_lines_data:
if base_url is not None:
image_url = json_line["image_url"]
json_line["image_url"] = os.path.join(base_url, image_url)
json_line["image_url"] = json_line["image_url"].replace("\\", "/")
json.dump(json_line, outfile, separators=(",", ":"))
outfile.write("\n")
print(f"Conversion completed. Converted {len(json_lines_data)} lines.")
coco_data = read_coco_file(input_coco_file_path)
print(f"Converting for {task_type}")
if task_type == "ObjectTracking":
converter = BoundingBoxConverter(coco_data)
write_json_lines(converter, output_file_path, base_url)
else:
print("ERROR: Invalid Task Type")
pass
if __name__ == "__main__":
# Parse arguments that are passed into the script
parser = argparse.ArgumentParser()
parser.add_argument("--input_cocovid_file_path", type=str, required=True)
parser.add_argument("--output_dir", type=str, required=True)
parser.add_argument("--output_file_name", type=str, required=True)
parser.add_argument(
"--task_type",
type=str,
required=True,
choices=["ObjectTracking"],
default="ObjectTracking",
)
parser.add_argument("--base_url", type=str, default=None)
args = parser.parse_args()
main(args)