nyc_taxis/_tools/parse.py (111 lines of code) (raw):

import csv import json import re import sys types = {} for f in [ "vendor_id", "cab_color", "payment_type", "trip_type", "rate_code_id", "store_and_fwd_flag", ]: types[f] = "keyword" for f in ["vendor_name"]: types[f] = "text" for f in ["passenger_count"]: types[f] = "integer" for f in ["pickup_location", "dropoff_location"]: types[f] = "geo_point" for f in [ "trip_distance", "fare_amount", "surcharge", "mta_tax", "extra", "ehail_fee", "improvement_surcharge", "tip_amount", "tolls_amount", "total_amount", ]: types[f] = "scaled_float" for f in ["pickup_datetime", "dropoff_datetime"]: types[f] = "date" def write_mappings(): mappings = {} for k, v in types.items(): mappings[k] = {"type": v} if v == "date": mappings[k]["format"] = "yyyy-MM-dd HH:mm:ss" elif v == "scaled_float": mappings[k]["scaling_factor"] = 100 mappings = {"properties": mappings} mappings["_all"] = {"enabled": False} mappings["dynamic"] = "strict" mappings = {"type": mappings} print(json.dumps(mappings, indent=2)) def to_geo_point(d, f): lat_field = f + "_latitude" lon_field = f + "_longitude" if lat_field in d and lon_field in d: longitude = float(d[lon_field]) latitude = float(d[lat_field]) if longitude < -180 or longitude > 180 or latitude < -90 or latitude > 90: raise Exception("Malformed coordinates") d[f + "_location"] = [float(d[lon_field]), float(d[lat_field])] del d[lon_field] del d[lat_field] def to_underscore(s): s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", s) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower() def to_json(f): fields = [] for field in f.readline().strip().split(","): field = to_underscore(field) if field.startswith("tpep_") or field.startswith("lpep_"): field = field[5:] elif field == "ratecode_id": field = "rate_code_id" fields.append(field) for line in f.readlines(): cols = line.strip().split(",") if len(cols) < len(fields): raise Exception("Cannot parse '%s': number of fields does not match '%s'" % (line, ",".join(fields))) try: d = {} for i in range(len(fields)): field = fields[i] value = cols[i] if value != "": # the way csv says the field does not exist d[field] = value to_geo_point(d, "pickup") to_geo_point(d, "dropoff") for k, v in d.items(): if k not in types: raise Exception("Unknown field '%s'" % k) t = types[k] try: if t == "integer": d[k] = int(v) elif t == "float": d[k] = float(v) except Exception as cause: raise Exception("Cannot parse (%s,%s)" % (k, v)) from cause print(json.dumps(d)) except KeyboardInterrupt: break except Exception as e: print( "Skipping malformed entry '%s' because of %s" % (line, str(e)), file=sys.stderr, ) if sys.argv[1] == "json": for file_name in sys.argv[2:]: with open(file_name) as f: to_json(f) elif sys.argv[1] == "mappings": write_mappings() else: raise Exception("Expected 'json' or 'mappings' but got %s" % sys.argv[1])