quests/endtoendml/solutions/serving/python_streaming.ipynb (462 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "## (Bonus) Streaming data prediction using Cloud ML Engine \n", "\n", "This notebook illustrates:\n", "\n", "1. Create a PubSub Topic and Subscription.\n", "2. Create a Dataflow Streaming pipeline to consume messages.\n", "3. Use the deployed Cloud ML Engine API to make prediction.\n", "4. Stroe the data and the prediction in BigQuery.\n", "5. Run a stream data simulator." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "TIME_FORMAT = '%Y-%m-%d %H:%M:%S'\n", "\n", "DATASET = 'playground_ds'\n", "TABLE = 'babyweight_estimates'\n", "\n", "PROJECT = 'cloud-training-demos'\n", "STG_BUCKET = 'cloud-training-demos-ml'\n", "REGION = 'us-central1'\n", "\n", "TOPIC = 'babyweights'\n", "SUBSCRIPTION='babyweights-sub'\n", "\n", "MODEL_NAME='babyweight_estimator'\n", "VERSION='v1'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "pip install apache-beam[gcp]\n", "pip install six==1.10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import datetime\n", "from google.cloud import pubsub_v1\n", "import json\n", "import apache_beam as beam\n", "import os\n", "print beam.__version__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create PubSub Topic and Subscription" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "publisher = pubsub_v1.PublisherClient()\n", "project_path = publisher.project_path(PROJECT)\n", "topic_path = publisher.topic_path(PROJECT, TOPIC)\n", "\n", "if topic_name not in map(lambda x: x.name, list(publisher.list_topics(project_path))):\n", " publisher.create_topic(topic_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit Dataflow Stream Processing Job" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Data source (PubSub topic) and sink (BigQuery table)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pubsub_topic = \"projects/{}/topics/{}\".format(PROJECT, TOPIC)\n", "\n", "schema_definition = {\n", " 'source_id':'INTEGER',\n", " 'source_timestamp':'TIMESTAMP',\n", " 'estimated_weight_kg':'FLOAT',\n", " 'is_male': 'STRING',\n", " 'mother_age': 'FLOAT',\n", " 'mother_race': 'STRING',\n", " 'plurality': 'FLOAT',\n", " 'gestation_weeks': 'INTEGER',\n", " 'mother_married': 'BOOLEAN',\n", " 'cigarette_use': 'BOOLEAN',\n", " 'alcohol_use': 'BOOLEAN'\n", "}\n", "\n", "schema = str(schema_definition).replace('{','').replace('}','').replace(\"'\",'').replace(' ','')\n", "\n", "print('Pub/Sub Topic URL: {}'.format(pubsub_topic))\n", "print('')\n", "print('BigQuery Dataset: {}'.format(DATASET))\n", "print('BigQuery Tabe: {}'.format(TABLE))\n", "print('')\n", "print('BigQuery Table Schema: {}'.format(schema))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cloud ML Engine prediction function" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def estimate_weight(json_message):\n", " \n", " import json\n", " from googleapiclient import discovery\n", " from oauth2client.client import GoogleCredentials\n", " \n", " global cmle_api\n", " \n", " # only do it once, not every time the function is called\n", " if cmle_api is None:\n", " credentials = GoogleCredentials.get_application_default()\n", " cmle_api = discovery.build('ml', 'v1', credentials=credentials,\n", " discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json',\n", " cache_discovery=False)\n", "\n", " instance = json.loads(json_message)\n", " source_id = instance.pop('source_id')\n", " source_timestamp = instance.pop('source_timestamp')\n", " \n", " request_data = {'instances': [instance]}\n", "\n", " model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, MODEL_NAME, VERSION)\n", " response = cmle_api.projects().predict(body=request_data, name=model_url).execute()\n", "\n", " estimates = list(map(lambda item: round(item[\"scores\"],2)\n", " ,response[\"predictions\"]\n", " ))\n", " \n", " estimated_weight_kg = round(int(estimates[0]) * 0.453592,2)\n", " \n", " instance['estimated_weight_kg'] = estimated_weight_kg\n", " instance['source_id'] = source_id\n", " instance['source_timestamp'] = source_timestamp\n", "\n", " return instance" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Beam streaming pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def run_babyweight_estimates_streaming_pipeline():\n", " \n", " job_name = 'ingest-babyweight-estimates-{}'.format(datetime.datetime.now().strftime('%y%m%d-%H%M%S'))\n", " print 'Launching Dataflow job {}'.format(job_name)\n", " print 'Check the Dataflow jobs on Google Cloud Console...'\n", "\n", " STG_DIR = 'gs://{}/babyweight'.format(STG_BUCKET)\n", "\n", " options = {\n", " 'region': REGION,\n", " 'staging_location': os.path.join(STG_DIR, 'tmp', 'staging'),\n", " 'temp_location': os.path.join(STG_DIR, 'tmp'),\n", " 'job_name': job_name,\n", " 'project': PROJECT,\n", " 'streaming': True,\n", " 'teardown_policy': 'TEARDOWN_ALWAYS',\n", " 'no_save_main_session': True\n", " }\n", "\n", " opts = beam.pipeline.PipelineOptions(flags=[], **options)\n", " \n", " pipeline = beam.Pipeline(runner=\"Dataflow\", options=opts)\n", " \n", " (\n", " pipeline | 'Read data from PubSub' >> beam.io.ReadStringsFromPubSub(topic=pubsub_topic) \n", " | 'Process message' >> beam.Map(estimate_weight)\n", " | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=PROJECT, dataset=DATASET, table=TABLE, \n", " schema=schema,\n", " create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n", " )\n", " )\n", "\n", " pipeline.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Pipeline on Dataflow" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run_babyweight_estimates_streaming_pipeline()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare Sample Data Points" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "instances = [\n", " {\n", " 'is_male': 'True',\n", " 'mother_age': 26.0,\n", " 'mother_race': 'Asian Indian',\n", " 'plurality': 1.0,\n", " 'gestation_weeks': 39,\n", " 'mother_married': 'True',\n", " 'cigarette_use': 'False',\n", " 'alcohol_use': 'False'\n", " },\n", " {\n", " 'is_male': 'False',\n", " 'mother_age': 29.0,\n", " 'mother_race': 'Asian Indian',\n", " 'plurality': 1.0,\n", " 'gestation_weeks': 38,\n", " 'mother_married': 'True',\n", " 'cigarette_use': 'False',\n", " 'alcohol_use': 'False'\n", " },\n", " {\n", " 'is_male': 'True',\n", " 'mother_age': 26.0,\n", " 'mother_race': 'White',\n", " 'plurality': 1.0,\n", " 'gestation_weeks': 39,\n", " 'mother_married': 'True',\n", " 'cigarette_use': 'False',\n", " 'alcohol_use': 'False'\n", " },\n", " {\n", " 'is_male': 'True',\n", " 'mother_age': 26.0,\n", " 'mother_race': 'White',\n", " 'plurality': 2.0,\n", " 'gestation_weeks': 37,\n", " 'mother_married': 'True',\n", " 'cigarette_use': 'False',\n", " 'alcohol_use': 'True'\n", " }\n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Send Data Points to PubSub" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from random import shuffle\n", "\n", "iterations = 100\n", "sleep_time = 1\n", "\n", "for i in range(iterations):\n", " \n", " shuffle(instances)\n", " \n", " for data_point in instances:\n", " \n", " source_timestamp = datetime.datetime.now().strftime(TIME_FORMAT)\n", " source_id = str(abs(hash(str(data_point)+str(source_timestamp))) % (10 ** 10))\n", " data_point['source_id'] = source_id\n", " data_point['source_timestamp'] = source_timestamp\n", " \n", " \n", " data = json.dumps(data_point)\n", " # Data must be a bytestring\n", " data = data.encode('utf-8')\n", " # When you publish a message, the client returns a future.\n", " future = publisher.publish(topic_path, data=data)\n", "\n", " print(\"Batch {} was sent to {}. \\n\\r Last Message was: {}\".format(i, topic_path, data))\n", " print(\"\")\n", "\n", " time.sleep(sleep_time)\n", "\n", "print(\"Done!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Consume PubSub Topic " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import logging\n", "import multiprocessing\n", "import random\n", "import time\n", "\n", "subscriber = pubsub_v1.SubscriberClient()\n", "topic_path = subscriber.topic_path(PROJECT, TOPIC)\n", "subscription_name = \"consumer\"\n", "subscription_path = subscriber.subscription_path(\n", " PROJECT, subscription_name)\n", "\n", "if subscription_path not in map(lambda x: x.name, list(subscriber.list_subscriptions(project_path))):\n", " subscription = subscriber.create_subscription(\n", " subscription_path, topic_path)\n", "\n", "NUM_MESSAGES = 2\n", "ACK_DEADLINE = 30\n", "SLEEP_TIME = 10\n", "\n", "# The subscriber pulls a specific number of messages.\n", "response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)\n", "\n", "multiprocessing.log_to_stderr()\n", "logger = multiprocessing.get_logger()\n", "logger.setLevel(logging.INFO)\n", "\n", "def worker(msg):\n", " \"\"\"Simulates a long-running process.\"\"\"\n", " RUN_TIME = random.randint(1, 60)\n", " logger.info('{}: Running {} for {}s'.format(\n", " time.strftime(\"%X\", time.gmtime()), msg.message.data, RUN_TIME))\n", " time.sleep(RUN_TIME)\n", "\n", "# `processes` stores process as key and ack id and message as values.\n", "processes = dict()\n", "for message in response.received_messages:\n", " process = multiprocessing.Process(target=worker, args=(message,))\n", " processes[process] = (message.ack_id, message.message.data)\n", " process.start()\n", "\n", "while processes:\n", " for process in list(processes):\n", " ack_id, msg_data = processes[process]\n", " # If the process is still running, reset the ack deadline as\n", " # specified by ACK_DEADLINE once every while as specified\n", " # by SLEEP_TIME.\n", " if process.is_alive():\n", " # `ack_deadline_seconds` must be between 10 to 600.\n", " subscriber.modify_ack_deadline(\n", " subscription_path,\n", " [ack_id],\n", " ack_deadline_seconds=ACK_DEADLINE)\n", " logger.info('{}: Reset ack deadline for {} for {}s'.format(\n", " time.strftime(\"%X\", time.gmtime()),\n", " msg_data, ACK_DEADLINE))\n", "\n", " # If the processs is finished, acknowledges using `ack_id`.\n", " else:\n", " subscriber.acknowledge(subscription_path, [ack_id])\n", " logger.info(\"{}: Acknowledged {}\".format(\n", " time.strftime(\"%X\", time.gmtime()), msg_data))\n", " processes.pop(process)\n", "\n", " # If there are still processes running, sleeps the thread.\n", " if processes:\n", " time.sleep(SLEEP_TIME)\n", "\n", "print(\"Received and acknowledged {} messages. Done.\".format(NUM_MESSAGES))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.15" } }, "nbformat": 4, "nbformat_minor": 2 }