def get_rows()

in jobs/socorro_import_crash_data.py [0:0]


def get_rows(schema):
    """Map the fields in a JSON schema to corresponding data structures in pyspark."""

    if "properties" not in schema:
        err_msg = "Invalid JSON schema: properties field is missing."
        log.error(err_msg)
        raise ValueError(err_msg)

    for prop in sorted(schema["properties"]):
        meta = schema["properties"][prop]
        if "string" in meta["type"]:
            logging.debug(f"{prop!r} allows the type to be String AND Integer")
            yield StructField(prop, StringType(), "null" in meta["type"])
        elif "integer" in meta["type"]:
            yield StructField(prop, IntegerType(), "null" in meta["type"])
        elif "boolean" in meta["type"]:
            yield StructField(prop, BooleanType(), "null" in meta["type"])
        elif meta["type"] == "array" and "items" not in meta:
            # Assuming strings in the array
            yield StructField(prop, ArrayType(StringType(), False), True)
        elif meta["type"] == "array" and "items" in meta:
            struct = StructType()
            for row in get_rows(meta["items"]):
                struct.add(row)
            yield StructField(prop, ArrayType(struct), True)
        elif meta["type"] == "object":
            struct = StructType()
            for row in get_rows(meta):
                struct.add(row)
            yield StructField(prop, struct, True)
        else:
            err_msg = f"Invalid JSON schema: {str(meta)[:100]}"
            log.error(err_msg)
            raise ValueError(err_msg)