## (Bonus) Streaming data prediction using Cloud ML Engine 

This notebook illustrates:

1. Create a PubSub Topic and Subscription.
2. Create a Dataflow Streaming pipeline to consume messages.
3. Use the deployed Cloud ML Engine API to make prediction.
4. Stroe the data and the prediction in BigQuery.
5. Run a stream data simulator.

In [None]:
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'

DATASET = 'playground_ds'
TABLE = 'babyweight_estimates'

PROJECT = 'cloud-training-demos'
STG_BUCKET = 'cloud-training-demos-ml'
REGION = 'us-central1'

TOPIC = 'babyweights'
SUBSCRIPTION='babyweights-sub'

MODEL_NAME='babyweight_estimator'
VERSION='v1'

In [None]:
%%bash

pip install google-cloud-dataflow
pip install apache_beam==2.3
pip install six==1.10

In [None]:
import time
import datetime
from google.cloud import pubsub
import json
import apache_beam as beam
import os
print beam.__version__

## Create PubSub Topic and Subscription

In [None]:
client = pubsub.Client()
topic = client.topic(TOPIC)

if not topic.exists():
    print('Creating pub/sub topic {}...'.format(TOPIC))
    topic.create()

print('Pub/sub topic {} is up and running'.format(TOPIC))
print("")

## Submit Dataflow Stream Processing Job

### Data source (PubSub topic) and sink (BigQuery table)

In [None]:
pubsub_topic = "projects/{}/topics/{}".format(PROJECT, TOPIC)

schema_definition = {
    'source_id':'INTEGER',
    'source_timestamp':'TIMESTAMP',
    'estimated_weight_kg':'FLOAT',
    'is_male': 'STRING',
    'mother_age': 'FLOAT',
    'mother_race': 'STRING',
    'plurality': 'FLOAT',
    'gestation_weeks': 'INTEGER',
    'mother_married': 'BOOLEAN',
    'cigarette_use': 'BOOLEAN',
    'alcohol_use': 'BOOLEAN'
}

schema = str(schema_definition).replace('{','').replace('}','').replace("'",'').replace(' ','')

print('Pub/Sub Topic URL: {}'.format(pubsub_topic))
print('')
print('BigQuery Dataset: {}'.format(DATASET))
print('BigQuery Tabe: {}'.format(TABLE))
print('')
print('BigQuery Table Schema: {}'.format(schema))

### Cloud ML Engine prediction function

In [None]:
def estimate_weight(json_message):
  
    import json
    from googleapiclient import discovery
    from oauth2client.client import GoogleCredentials
    
    global cmle_api
    
    # only do it once, not every time the function is called
    if cmle_api is None:
        credentials = GoogleCredentials.get_application_default()
        cmle_api = discovery.build('ml', 'v1', credentials=credentials,
                              discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',
                              cache_discovery=False)

    instance = json.loads(json_message)
    source_id = instance.pop('source_id')
    source_timestamp = instance.pop('source_timestamp')
    
    request_data = {'instances': [instance]}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, MODEL_NAME, VERSION)
    response = cmle_api.projects().predict(body=request_data, name=model_url).execute()

    estimates = list(map(lambda item: round(item["scores"],2)
        ,response["predictions"]
    ))
    
    estimated_weight_kg =  round(int(estimates[0]) * 0.453592,2)
    
    instance['estimated_weight_kg'] = estimated_weight_kg
    instance['source_id'] = source_id
    instance['source_timestamp'] = source_timestamp

    return instance

### Beam streaming pipeline

In [None]:
def run_babyweight_estimates_streaming_pipeline():
    
    job_name = 'ingest-babyweight-estimates-{}'.format(datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
    print 'Launching Dataflow job {}'.format(job_name)
    print 'Check the Dataflow jobs on Google Cloud Console...'

    STG_DIR = 'gs://{}/babyweight'.format(STG_BUCKET)

    options = {
        'region': REGION,
        'staging_location': os.path.join(STG_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(STG_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'streaming': True,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
      }

    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    
    pipeline = beam.Pipeline(runner="Dataflow", options=opts)
      
    (
      pipeline | 'Read data from PubSub' >> beam.io.ReadStringsFromPubSub(topic=pubsub_topic) 
               | 'Process message' >> beam.Map(estimate_weight)
               | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=PROJECT, dataset=DATASET, table=TABLE, 
                                                                schema=schema,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                                               )
    )

    pipeline.run()

## Run Pipeline on Dataflow

In [None]:
run_babyweight_estimates_streaming_pipeline()

## Prepare Sample Data Points

In [None]:
instances =  [
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'Asian Indian',
        'plurality': 1.0,
        'gestation_weeks': 39,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'False',
        'mother_age': 29.0,
        'mother_race': 'Asian Indian',
        'plurality': 1.0,
        'gestation_weeks': 38,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'White',
        'plurality': 1.0,
        'gestation_weeks': 39,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'White',
        'plurality': 2.0,
        'gestation_weeks': 37,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'True'
      }
  ]

## Send Data Points to PubSub

In [None]:
from random import shuffle

iterations = 10000
sleep_time = 1

for i in range(iterations):
    
    shuffle(instances)
    
    for data_point in instances:
        
        source_timestamp = datetime.datetime.now().strftime(TIME_FORMAT)
        source_id = str(abs(hash(str(data_point)+str(source_timestamp))) % (10 ** 10))
        data_point['source_id'] = source_id
        data_point['source_timestamp'] = source_timestamp
        
        message = json.dumps(data_point)
        topic.publish(message=message, source_id = source_id, source_timestamp=source_timestamp)

    print("Batch {} was sent to {}. \n\r Last Message was: {}".format(i, topic.full_name, message))
    print("")

    time.sleep(sleep_time)

print("Done!")

## Consume PubSub Topic 

In [None]:
client = pubsub.Client()
topic = client.topic(TOPIC)

subscription = topic.subscription(name=SUBSCRIPTION)
if not subscription.exists():
    print('Creating pub/sub subscription {}...'.format(SUBSCRIPTION))
    subscription.create(client=client)

print ('Pub/sub subscription {} is up and running'.format(SUBSCRIPTION))
print("")

message = subscription.pull()

print("source_id", message[0][1].attributes["source_id"])
print("source_timestamp:", message[0][1].attributes["source_timestamp"])
print("")
print(message[0][1].data)

Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License