def run()

in data-analytics/beam_ml_toxicity_in_gaming/exercises/toxicity_pipeline.py [0:0]


def run(project_id, gaming_model_location, movie_model_location, pipeline_args):
    pipeline_options = PipelineOptions(
        pipeline_args, save_main_session=True)
    
    # We are using a topic for input
    # Pub/Sub IO will automatically create a subscription for us
    input_topic = "projects/{}/topics/tox-input".format(project_id)
    output_topic = "projects/{}/topics/tox-output".format(project_id)
    output_bigquery = "{}:demo.tox".format(project_id)

    with beam.Pipeline(options=pipeline_options) as p:

        # We first read from Pub/Sub
        # Because it's a streaming pipeline, we need to apply a window for the join
        # Finally we key the data so we can join it back after the A/B test
        read_from_pubsub = (
            p 
            | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=input_topic,with_attributes=True)
            # In this particular example, we aren't worried about an accurate window
            # If uniqueness is an issue, we can switch to using message ID of each message 
            # The message ID will be unique and will ensure uniqueness 
            | "Window data" >> beam.WindowInto(beam.window.FixedWindows(0.1))
            | "Key up input" >> beam.ParDo(tag_with_key())
        )

        # Load the model into a handler
        # We use KeyedModelHandler here to automatically handle the incoming keys
        # It also returns the key so you can preserve the key and use it after the prediction
        gaming_model_handler = KeyedModelHandler(extendTFModelHandlerTensor(gaming_model_location))

        # Use the handler to perform inference
        # Note that the gaming toxicity score is based on "toxic or not"
        # The scale differs from the movie model
        gaming_inference = (
            read_from_pubsub 
            | "Perform gaming inference" >> RunInference(gaming_model_handler)
        )

        # Flag the values so we can determine if toxic or not
        nice_or_not = (
            gaming_inference 
            | beam.ParDo(flag_for_toxic())
        )
        
        # Print to screen so we can see the results
        nice_or_not | beam.Map(print)

        # Filter, if toxic then write to Pub/Sub
        # "Not" denotes not nice
        not_filter = nice_or_not | beam.Filter(lambda outcome: outcome[0] == "not")
        
        # Write to Pub/Sub
        _ = (not_filter 
            | "Convert to bytestring" >> beam.Map(lambda element: bytes(str(element[1]),"UTF-8"))
            | beam.io.WriteToPubSub(topic=output_topic)
        )

        # Load the model into a handler
        movie_model_handler = KeyedModelHandler(extendTFModelHandlerTensor(movie_model_location))


        # Note that the movie score differ in scoring
        # "negative" would mean negative values
        # "postivie" would mean positive values
        # Use the handler to perform inference
        movie_inference = (
            read_from_pubsub 
            | "Perform movie inference" >> RunInference(movie_model_handler)
        )

        # We join up the data so we can compare the values later
        joined = (
            ({'gaming': gaming_inference, 'movie': movie_inference})
            | 'Join' >> beam.CoGroupByKey()
        )

        # Simple string schema - normally not recommended 
        # For brevity sake, we convert to a single string
        data_schema = {'fields': [
            {'name': 'data_col', 'type': 'STRING', 'mode': 'NULLABLE'}]}
        
        # Write to BigQuery
        # We're converting to the simple string to insert
        output_to_bq = (
            joined
            | "Convert to string" >> beam.Map(lambda element: {"data_col":str(element)})
            | beam.io.gcp.bigquery.WriteToBigQuery(
                method=beam.io.gcp.bigquery.WriteToBigQuery.Method.STREAMING_INSERTS,
                table=output_bigquery,
                schema=data_schema
            )
        )