in jobs/socorro_import_crash_data.py [0:0]
def get_rows(schema):
"""Map the fields in a JSON schema to corresponding data structures in pyspark."""
if "properties" not in schema:
err_msg = "Invalid JSON schema: properties field is missing."
log.error(err_msg)
raise ValueError(err_msg)
for prop in sorted(schema["properties"]):
meta = schema["properties"][prop]
if "string" in meta["type"]:
logging.debug(f"{prop!r} allows the type to be String AND Integer")
yield StructField(prop, StringType(), "null" in meta["type"])
elif "integer" in meta["type"]:
yield StructField(prop, IntegerType(), "null" in meta["type"])
elif "boolean" in meta["type"]:
yield StructField(prop, BooleanType(), "null" in meta["type"])
elif meta["type"] == "array" and "items" not in meta:
# Assuming strings in the array
yield StructField(prop, ArrayType(StringType(), False), True)
elif meta["type"] == "array" and "items" in meta:
struct = StructType()
for row in get_rows(meta["items"]):
struct.add(row)
yield StructField(prop, ArrayType(struct), True)
elif meta["type"] == "object":
struct = StructType()
for row in get_rows(meta):
struct.add(row)
yield StructField(prop, struct, True)
else:
err_msg = f"Invalid JSON schema: {str(meta)[:100]}"
log.error(err_msg)
raise ValueError(err_msg)