nyc_taxis/_tools/parse.py (111 lines of code) (raw):
import csv
import json
import re
import sys
types = {}
for f in [
"vendor_id",
"cab_color",
"payment_type",
"trip_type",
"rate_code_id",
"store_and_fwd_flag",
]:
types[f] = "keyword"
for f in ["vendor_name"]:
types[f] = "text"
for f in ["passenger_count"]:
types[f] = "integer"
for f in ["pickup_location", "dropoff_location"]:
types[f] = "geo_point"
for f in [
"trip_distance",
"fare_amount",
"surcharge",
"mta_tax",
"extra",
"ehail_fee",
"improvement_surcharge",
"tip_amount",
"tolls_amount",
"total_amount",
]:
types[f] = "scaled_float"
for f in ["pickup_datetime", "dropoff_datetime"]:
types[f] = "date"
def write_mappings():
mappings = {}
for k, v in types.items():
mappings[k] = {"type": v}
if v == "date":
mappings[k]["format"] = "yyyy-MM-dd HH:mm:ss"
elif v == "scaled_float":
mappings[k]["scaling_factor"] = 100
mappings = {"properties": mappings}
mappings["_all"] = {"enabled": False}
mappings["dynamic"] = "strict"
mappings = {"type": mappings}
print(json.dumps(mappings, indent=2))
def to_geo_point(d, f):
lat_field = f + "_latitude"
lon_field = f + "_longitude"
if lat_field in d and lon_field in d:
longitude = float(d[lon_field])
latitude = float(d[lat_field])
if longitude < -180 or longitude > 180 or latitude < -90 or latitude > 90:
raise Exception("Malformed coordinates")
d[f + "_location"] = [float(d[lon_field]), float(d[lat_field])]
del d[lon_field]
del d[lat_field]
def to_underscore(s):
s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower()
def to_json(f):
fields = []
for field in f.readline().strip().split(","):
field = to_underscore(field)
if field.startswith("tpep_") or field.startswith("lpep_"):
field = field[5:]
elif field == "ratecode_id":
field = "rate_code_id"
fields.append(field)
for line in f.readlines():
cols = line.strip().split(",")
if len(cols) < len(fields):
raise Exception("Cannot parse '%s': number of fields does not match '%s'" % (line, ",".join(fields)))
try:
d = {}
for i in range(len(fields)):
field = fields[i]
value = cols[i]
if value != "": # the way csv says the field does not exist
d[field] = value
to_geo_point(d, "pickup")
to_geo_point(d, "dropoff")
for k, v in d.items():
if k not in types:
raise Exception("Unknown field '%s'" % k)
t = types[k]
try:
if t == "integer":
d[k] = int(v)
elif t == "float":
d[k] = float(v)
except Exception as cause:
raise Exception("Cannot parse (%s,%s)" % (k, v)) from cause
print(json.dumps(d))
except KeyboardInterrupt:
break
except Exception as e:
print(
"Skipping malformed entry '%s' because of %s" % (line, str(e)),
file=sys.stderr,
)
if sys.argv[1] == "json":
for file_name in sys.argv[2:]:
with open(file_name) as f:
to_json(f)
elif sys.argv[1] == "mappings":
write_mappings()
else:
raise Exception("Expected 'json' or 'mappings' but got %s" % sys.argv[1])