04_streaming/realtime/avg02.py [22:63]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'


def compute_stats(airport, events):
    arrived = [event['ARR_DELAY'] for event in events if event['EVENT_TYPE'] == 'arrived']
    avg_arr_delay = float(np.mean(arrived)) if len(arrived) > 0 else None

    departed = [event['DEP_DELAY'] for event in events if event['EVENT_TYPE'] == 'departed']
    avg_dep_delay = float(np.mean(departed)) if len(departed) > 0 else None

    num_flights = len(events)
    start_time = min([event['EVENT_TIME'] for event in events])
    latest_time = max([event['EVENT_TIME'] for event in events])

    return {
        'AIRPORT': airport,
        'AVG_ARR_DELAY': avg_arr_delay,
        'AVG_DEP_DELAY': avg_dep_delay,
        'NUM_FLIGHTS': num_flights,
        'START_TIME': start_time,
        'END_TIME': latest_time
    }


def by_airport(event):
    if event['EVENT_TYPE'] == 'departed':
        return event['ORIGIN'], event
    else:
        return event['DEST'], event


def run(project, bucket, region):
    argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
        '--save_main_session',
        '--staging_location=gs://{0}/flights/staging/'.format(bucket),
        '--temp_location=gs://{0}/flights/temp/'.format(bucket),
        '--autoscaling_algorithm=THROUGHPUT_BASED',
        '--max_num_workers=8',
        '--region={}'.format(region),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



04_streaming/realtime/avg03.py [22:63]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%S'


def compute_stats(airport, events):
    arrived = [event['ARR_DELAY'] for event in events if event['EVENT_TYPE'] == 'arrived']
    avg_arr_delay = float(np.mean(arrived)) if len(arrived) > 0 else None

    departed = [event['DEP_DELAY'] for event in events if event['EVENT_TYPE'] == 'departed']
    avg_dep_delay = float(np.mean(departed)) if len(departed) > 0 else None

    num_flights = len(events)
    start_time = min([event['EVENT_TIME'] for event in events])
    latest_time = max([event['EVENT_TIME'] for event in events])

    return {
        'AIRPORT': airport,
        'AVG_ARR_DELAY': avg_arr_delay,
        'AVG_DEP_DELAY': avg_dep_delay,
        'NUM_FLIGHTS': num_flights,
        'START_TIME': start_time,
        'END_TIME': latest_time
    }


def by_airport(event):
    if event['EVENT_TYPE'] == 'departed':
        return event['ORIGIN'], event
    else:
        return event['DEST'], event


def run(project, bucket, region):
    argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
        '--save_main_session',
        '--staging_location=gs://{0}/flights/staging/'.format(bucket),
        '--temp_location=gs://{0}/flights/temp/'.format(bucket),
        '--autoscaling_algorithm=THROUGHPUT_BASED',
        '--max_num_workers=8',
        '--region={}'.format(region),
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



