def geojson2coco()

in libs/solaris/data/coco.py [0:0]


def geojson2coco(image_src, label_src, output_path=None, image_ext='.tif',
                 matching_re=None, category_attribute=None, score_attribute=None,
                 preset_categories=None, include_other=True, info_dict=None,
                 license_dict=None, recursive=False, override_crs=False,
                 explode_all_multipolygons=False, remove_all_multipolygons=False, 
                 verbose=0):
    """Generate COCO-formatted labels from one or multiple geojsons and images.

    This function ingests optionally georegistered polygon labels in geojson
    format alongside image(s) and generates .json files per the
    `COCO dataset specification`_ . Some models, like
    many Mask R-CNN implementations, require labels to be in this format. The
    function assumes you're providing image file(s) and geojson file(s) to
    create the dataset. If the number of images and geojsons are both > 1 (e.g.
    with a SpaceNet dataset), you must provide a regex pattern to extract
    matching substrings to match images to label files.

    .. _COCO dataset specification: http://cocodataset.org/

    Arguments
    ---------
    image_src : :class:`str` or :class:`list` or :class:`dict`
        Source image(s) to use in the dataset. This can be::

            1. a string path to an image,
            2. the path to a directory containing a bunch of images,
            3. a list of image paths,
            4. a dictionary corresponding to COCO-formatted image records, or
            5. a string path to a COCO JSON containing image records.

        If a directory, the `recursive` flag will be used to determine whether
        or not to descend into sub-directories.
    label_src : :class:`str` or :class:`list`
        Source labels to use in the dataset. This can be a string path to a
        geojson, the path to a directory containing multiple geojsons, or a
        list of geojson file paths. If a directory, the `recursive` flag will
        determine whether or not to descend into sub-directories.
    output_path : str, optional
        The path to save the JSON-formatted COCO records to. If not provided,
        the records will only be returned as a dict, and not saved to file.
    image_ext : str, optional
        The string to use to identify images when searching directories. Only
        has an effect if `image_src` is a directory path. Defaults to
        ``".tif"``.
    matching_re : str, optional
        A regular expression pattern to match filenames between `image_src`
        and `label_src` if both are directories of multiple files. This has
        no effect if those arguments do not both correspond to directories or
        lists of files. Will raise a ``ValueError`` if multiple files are
        provided for both `image_src` and `label_src` but no `matching_re` is
        provided.
    category_attribute : str, optional
        The name of an attribute in the geojson that specifies which category
        a given instance corresponds to. If not provided, it's assumed that
        only one class of object is present in the dataset, which will be
        termed ``"other"`` in the output json.
    score_attribute : str, optional
        The name of an attribute in the geojson that specifies the prediction
        confidence of a model
    preset_categories : :class:`list` of :class:`dict`s, optional
        A pre-set list of categories to use for labels. These categories should
        be formatted per
        `the COCO category specification`_.
        example:
        [{'id': 1, 'name': 'Fighter Jet', 'supercategory': 'plane'},
        {'id': 2, 'name': 'Military Bomber', 'supercategory': 'plane'}, ... ]
    include_other : bool, optional
        If set to ``True``, and `preset_categories` is provided, objects that
        don't fall into the specified categories will not be removed from the
        dataset. They will instead be passed into a category named ``"other"``
        with its own associated category ``id``. If ``False``, objects whose
        categories don't match a category from `preset_categories` will be
        dropped.
    info_dict : dict, optional
        A dictonary with the following key-value pairs::

            - ``"year"``: :class:`int` year of creation
            - ``"version"``: :class:`str` version of the dataset
            - ``"description"``: :class:`str` string description of the dataset
            - ``"contributor"``: :class:`str` who contributed the dataset
            - ``"url"``: :class:`str` URL where the dataset can be found
            - ``"date_created"``: :class:`datetime.datetime` when the dataset
                was created

    license_dict : dict, optional
        A dictionary containing the licensing information for the dataset, with
        the following key-value pairs::

            - ``"name": :class:`str` the name of the license.
            -  ``"url": :class:`str` a link to the dataset's license.

        *Note*: This implementation assumes that all of the data uses one
        license. If multiple licenses are provided, the image records will not
        be assigned a license ID.
    recursive : bool, optional
        If `image_src` and/or `label_src` are directories, setting this flag
        to ``True`` will induce solaris to descend into subdirectories to find
        files. By default, solaris does not traverse the directory tree.
    explode_all_multipolygons : bool, optional
        Explode the multipolygons into individual geometries using sol.utils.geo.split_multi_geometries. 
        Be sure to inspect which geometries are multigeometries, each individual geometries within these 
        may represent artifacts rather than true labels.
    remove_all_multipolygons : bool, optional
        Filters MultiPolygons and GeometryCollections out of each tile geodataframe. Alternatively you 
        can edit each polygon manually to be a polygon before converting to COCO format.
    verbose : int, optional
        Verbose text output. By default, none is provided; if ``True`` or
        ``1``, information-level outputs are provided; if ``2``, extremely
        verbose text is output.

    Returns
    -------
    coco_dataset : dict
        A dictionary following the `COCO dataset specification`_ . Depending
        on arguments provided, it may or may not include license and info
        metadata.
    """

    # first, convert both image_src and label_src to lists of filenames
    logger = logging.getLogger(__name__)
    logger.setLevel(_get_logging_level(int(verbose)))
    logger.debug('Preparing image filename: image ID dict.')
    # pdb.set_trace()
    if isinstance(image_src, str):
        if image_src.endswith('json'):
            logger.debug('COCO json provided. Extracting fname:id dict.')
            with open(image_src, 'r') as f:
                image_ref = json.load(f)
                image_ref = {image['file_name']: image['id']
                             for image in image_ref['images']}
        else:
            image_list = _get_fname_list(image_src, recursive=recursive,
                                         extension=image_ext)
            image_ref = dict(zip(image_list,
                                 list(range(1, len(image_list) + 1))
                                 ))
    elif isinstance(image_src, dict):
        logger.debug('image COCO dict provided. Extracting fname:id dict.')
        if 'images' in image_src.keys():
            image_ref = image_src['images']
        else:
            image_ref = image_src
        image_ref = {image['file_name']: image['id']
                     for image in image_ref}
    else:
        logger.debug('Non-COCO formatted image set provided. Generating '
                     'image fname:id dict with arbitrary ID integers.')
        image_list = _get_fname_list(image_src, recursive=recursive,
                                     extension=image_ext)
        image_ref = dict(zip(image_list, list(range(1, len(image_list) + 1))))

    logger.debug('Preparing label filename list.')
    label_list = _get_fname_list(label_src, recursive=recursive,
                                 extension='json')

    logger.debug('Checking if images and vector labels must be matched.')
    do_matches = len(image_ref) > 1 and len(label_list) > 1
    if do_matches:
        logger.info('Matching images to label files.')
        im_names = pd.DataFrame({'image_fname': list(image_ref.keys())})
        label_names = pd.DataFrame({'label_fname': label_list})
        logger.debug('Getting substrings for matching from image fnames.')
        if matching_re is not None:
            im_names['match_substr'] = im_names['image_fname'].str.extract(
                matching_re)
            logger.debug('Getting substrings for matching from label fnames.')
            label_names['match_substr'] = label_names[
                'label_fname'].str.extract(matching_re)
        else:
            logger.debug('matching_re is none, getting full filenames '
                         'without extensions for matching.')
            im_names['match_substr'] = im_names['image_fname'].apply(
                lambda x: os.path.splitext(os.path.split(x)[1])[0])
            im_names['match_substr'] = im_names['match_substr'].astype(
                str)
            label_names['match_substr'] = label_names['label_fname'].apply(
                lambda x: os.path.splitext(os.path.split(x)[1])[0])
            label_names['match_substr'] = label_names['match_substr'].astype(
                str)
        match_df = im_names.merge(label_names, on='match_substr', how='inner')

    logger.info('Loading labels.')
    label_df = pd.DataFrame({'label_fname': [],
                             'category_str': [],
                             'geometry': []})
    for gj in tqdm(label_list):
        logger.debug('Reading in {}'.format(gj))
        curr_gdf = gpd.read_file(gj)
        
        if remove_all_multipolygons is True and explode_all_multipolygons is True:
            raise ValueError("Only one of remove_all_multipolygons or explode_all_multipolygons can be set to True.")
        if remove_all_multipolygons is True and explode_all_multipolygons is False:
            curr_gdf = remove_multipolygons(curr_gdf)
        elif explode_all_multipolygons is True:
            curr_gdf = split_multi_geometries(curr_gdf)
        
        curr_gdf['label_fname'] = gj
        curr_gdf['image_fname'] = ''
        curr_gdf['image_id'] = np.nan
        if category_attribute is None:
            logger.debug('No category attribute provided. Creating a default '
                         '"other" category.')
            curr_gdf['category_str'] = 'other'  # add arbitrary value
            tmp_category_attribute = 'category_str'
        else:
            tmp_category_attribute = category_attribute
        if do_matches:  # multiple images: multiple labels
            logger.debug('do_matches is True, finding matching image')
            logger.debug('Converting to pixel coordinates.')
            if len(curr_gdf) > 0:  # if there are geoms, reproj to px coords
                curr_gdf = geojson_to_px_gdf(
                    curr_gdf,
                    override_crs=override_crs,
                    im_path=match_df.loc[match_df['label_fname'] == gj,
                                         'image_fname'].values[0])
                curr_gdf['image_id'] = image_ref[match_df.loc[
                    match_df['label_fname'] == gj, 'image_fname'].values[0]]
        # handle case with multiple images, one big geojson
        elif len(image_ref) > 1 and len(label_list) == 1:
            logger.debug('do_matches is False. Many images:1 label detected.')
            raise NotImplementedError('one label file: many images '
                                      'not implemented yet.')
        elif len(image_ref) == 1 and len(label_list) == 1:
            logger.debug('do_matches is False. 1 image:1 label detected.')
            logger.debug('Converting to pixel coordinates.')
            # match the two images
            curr_gdf = geojson_to_px_gdf(curr_gdf,
                                         override_crs=override_crs,
                                         im_path=list(image_ref.keys())[0])
            curr_gdf['image_id'] = list(image_ref.values())[0]
        curr_gdf = curr_gdf.rename(
            columns={tmp_category_attribute: 'category_str'})
        if score_attribute is not None:
            curr_gdf = curr_gdf[['image_id', 'label_fname', 'category_str',
                                 score_attribute, 'geometry']]
        else:
            curr_gdf = curr_gdf[['image_id', 'label_fname', 'category_str',
                                 'geometry']]
        label_df = pd.concat([label_df, curr_gdf], axis='index',
                             ignore_index=True, sort=False)

    logger.info('Finished loading labels.')
    logger.info('Generating COCO-formatted annotations.')
    coco_dataset = df_to_coco_annos(label_df,
                                    geom_col='geometry',
                                    image_id_col='image_id',
                                    category_col='category_str',
                                    score_col=score_attribute,
                                    preset_categories=preset_categories,
                                    include_other=include_other,
                                    verbose=verbose)

    logger.info('Generating COCO-formatted image and license records.')
    if license_dict is not None:
        logger.debug('Getting license ID.')
        if len(license_dict) == 1:
            logger.debug('Only one license present; assuming it applies to '
                         'all images.')
            license_id = 1
        else:
            logger.debug('Zero or multiple licenses present. Not trying to '
                         'match to images.')
            license_id = None
        logger.info('Adding licenses to dataset.')
        coco_licenses = []
        license_idx = 1
        for license_name, license_url in license_dict.items():
            coco_licenses.append({'name': license_name,
                                  'url': license_url,
                                  'id': license_idx})
            license_idx += 1
        coco_dataset['licenses'] = coco_licenses
    else:
        logger.debug('No license information provided, skipping for image '
                     'COCO records.')
        license_id = None
    coco_image_records = make_coco_image_dict(image_ref, license_id)
    coco_dataset['images'] = coco_image_records

    logger.info('Adding any additional information provided as arguments.')
    if info_dict is not None:
        coco_dataset['info'] = info_dict

    if output_path is not None:
        with open(output_path, 'w') as outfile:
            json.dump(coco_dataset, outfile)

    return coco_dataset