in 04_streaming/transform/df05.py [0:0]
def run():
with beam.Pipeline('DirectRunner') as pipeline:
airports = (pipeline
| 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')
| beam.Filter(lambda line: "United States" in line)
| 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
| 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
)
flights = (pipeline
| 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
| 'flights:parse' >> beam.Map(lambda line: json.loads(line))
| 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
)
(flights
| 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
| 'flights:out' >> beam.io.textio.WriteToText('all_flights')
)
events = flights | beam.FlatMap(get_next_event)
(events
| 'events:tostring' >> beam.Map(lambda fields: json.dumps(fields))
| 'events:out' >> beam.io.textio.WriteToText('all_events')
)