noaa/_tools/process.py (170 lines of code) (raw):

#################################################################### # # process the csv file into Elasticsearch json documents # #################################################################### import csv import json import os from datetime import datetime stationsFile = "ghcnd-stations.txt" countriesFile = "ghcnd-countries.txt" statesFile = "ghcnd-states.txt" weatherDataFiles = ["2014-sorted.csv", "2015-sorted.csv", "2016-sorted.csv"] indexPrefix = "weather-data" docType = "summary" def loadStatesFile(statesFile): statesMap = {} with open(statesFile, "r") as file: csvreader = csv.reader(file, delimiter=" ", quotechar='"') for row in csvreader: statesMap[row[0].strip()] = row[1].strip() return statesMap def loadCountriesFile(countriesFile): countriesMap = {} with open(countriesFile, "r") as file: csvreader = csv.reader(file, delimiter=" ", quotechar='"') for row in csvreader: countriesMap[row[0].strip()] = row[1].strip() return countriesMap def loadStationsFile(stationsFile, statesFile, countriesFile): statesMap = loadStatesFile(statesFile) countriesMap = loadCountriesFile(countriesFile) stationsMap = {} with open(stationsFile, "r") as file: for row in file: try: station = {} station["id"] = row[0:11].strip() countryCode = row[0:2].strip() if len(countryCode) > 0: station["country_code"] = countryCode station["country"] = countriesMap[countryCode] station["location"] = { "lat": float(row[12:20].strip()), "lon": float(row[21:30].strip()), } station["elevation"] = float(row[31:37].strip()) if countryCode == "US": stateCode = row[38:40].strip() if len(stateCode) > 0: station["state_code"] = stateCode station["state"] = statesMap[stateCode] station["name"] = row[41:71].strip() gsn_flag = row[72:75].strip() if len(gsn_flag) > 0: station["gsn_flag"] = gsn_flag hcn_crn_flag = row[76:78].strip() if len(hcn_crn_flag) > 0: station["hcn_crn_flag"] = hcn_crn_flag wmo_id = row[80:85].strip() if len(wmo_id) > 0: station["wmo_id"] = wmo_id stationsMap[station["id"]] = station except: print(row) raise e return stationsMap def processWeatherDoc(currentStationDoc): if "TMAX" in currentStationDoc: currentStationDoc["TMAX"] = float(currentStationDoc["TMAX"]) / 10.0 if "TMIN" in currentStationDoc: currentStationDoc["TMIN"] = float(currentStationDoc["TMIN"]) / 10.0 if "PRCP" in currentStationDoc: currentStationDoc["PRCP"] = float(currentStationDoc["PRCP"]) / 10.0 if "AWND" in currentStationDoc: currentStationDoc["AWND"] = float(currentStationDoc["AWND"]) / 10.0 if "EVAP" in currentStationDoc: currentStationDoc["EVAP"] = float(currentStationDoc["EVAP"]) / 10.0 if "MDEV" in currentStationDoc: currentStationDoc["MDEV"] = float(currentStationDoc["MDEV"]) / 10.0 if "MDPR" in currentStationDoc: currentStationDoc["MDPR"] = float(currentStationDoc["MDPR"]) / 10.0 if "MDTN" in currentStationDoc: currentStationDoc["MDTN"] = float(currentStationDoc["MDTN"]) / 10.0 if "MDTX" in currentStationDoc: currentStationDoc["MDTX"] = float(currentStationDoc["MDTX"]) / 10.0 if "MNPN" in currentStationDoc: currentStationDoc["MNPN"] = float(currentStationDoc["MNPN"]) / 10.0 if "MXPN" in currentStationDoc: currentStationDoc["MXPN"] = float(currentStationDoc["MXPN"]) / 10.0 if "TAVG" in currentStationDoc: currentStationDoc["TAVG"] = float(currentStationDoc["TAVG"]) / 10.0 if "THIC" in currentStationDoc: currentStationDoc["THIC"] = float(currentStationDoc["THIC"]) / 10.0 if "TOBS" in currentStationDoc: currentStationDoc["TOBS"] = float(currentStationDoc["TOBS"]) / 10.0 if "WESD" in currentStationDoc: currentStationDoc["WESD"] = float(currentStationDoc["WESD"]) / 10.0 if "WESF" in currentStationDoc: currentStationDoc["WESF"] = float(currentStationDoc["WESF"]) / 10.0 if "WSF1" in currentStationDoc: currentStationDoc["WSF1"] = float(currentStationDoc["WSF1"]) / 10.0 if "WSF2" in currentStationDoc: currentStationDoc["WSF2"] = float(currentStationDoc["WSF2"]) / 10.0 if "WSF5" in currentStationDoc: currentStationDoc["WSF5"] = float(currentStationDoc["WSF5"]) / 10.0 if "WSFG" in currentStationDoc: currentStationDoc["WSFG"] = float(currentStationDoc["WSFG"]) / 10.0 if "WSFI" in currentStationDoc: currentStationDoc["WSFI"] = float(currentStationDoc["WSFI"]) / 10.0 if "WSFM" in currentStationDoc: currentStationDoc["WSFM"] = float(currentStationDoc["WSFM"]) / 10.0 if "TMIN" in currentStationDoc and "TMAX" in currentStationDoc: if currentStationDoc["TMIN"] > currentStationDoc["TMAX"]: tmp = currentStationDoc["TMIN"] currentStationDoc["TMIN"] = currentStationDoc["TMAX"] currentStationDoc["TMAX"] = tmp currentStationDoc["TRANGE"] = { "gte": currentStationDoc["TMIN"], "lte": currentStationDoc["TMAX"], } if "MDTN" in currentStationDoc and "MDTX" in currentStationDoc: if currentStationDoc["MDTN"] > currentStationDoc["MDTX"]: tmp = currentStationDoc["MDTN"] currentStationDoc["MDTN"] = currentStationDoc["MDTX"] currentStationDoc["MDTX"] = tmp currentStationDoc["MDTRANGE"] = { "gte": currentStationDoc["MDTN"], "lte": currentStationDoc["MDTX"], } indexDoc = { "_op_type": "create", "_index": indexPrefix + "-" + str(currentStationDoc["date"].year), "_type": docType, "_id": currentStationDoc["date"].strftime("%Y-%m-%d") + "-" + currentStationDoc["station"]["id"], "_source": currentStationDoc, } return indexDoc def processWeatherFile(weatherDataFile, stationsMap): with open(weatherDataFile, "r") as file: csvreader = csv.reader(file, delimiter=",", quotechar='"') currentStationDoc = None stationDocsProcessed = 0 for row in csvreader: station = stationsMap[row[0]] date = datetime.strptime(row[1], "%Y%m%d") elementType = row[2] elementValue = row[3] if currentStationDoc == None: currentStationDoc = { "station": station, "date": date, elementType: elementValue, } elif currentStationDoc["station"] != station or currentStationDoc["date"] != date: yield processWeatherDoc(currentStationDoc) stationDocsProcessed = stationDocsProcessed + 1 currentStationDoc = { "station": station, "date": date, elementType: elementValue, } else: currentStationDoc[elementType] = elementValue stationsMap = loadStationsFile(stationsFile, statesFile, countriesFile) outFile = "documents.json" with open(outFile, "w+") as file: count = 0 for weatherDataFile in weatherDataFiles: for doc in processWeatherFile(weatherDataFile, stationsMap): doc["_source"]["date"] = doc["_source"]["date"].isoformat() file.write(json.dumps(doc["_source"])) file.write("\n") count = count + 1 print("Wrote " + str(count) + " entries")