def run()

in data-analytics/beam_ml_toxicity_in_gaming/exercises/part1.py [0:0]


def run(project_id, gaming_model_location, movie_model_location, pipeline_args):
    pipeline_options = PipelineOptions(
        pipeline_args, save_main_session=True)
    
    # We are using a topic for input
    # Pub/Sub IO will automatically create a subscription for us
    input_topic = "projects/{}/topics/tox-input".format(project_id)
    output_topic = "projects/{}/topics/tox-output".format(project_id)
    output_bigquery = "{}:demo.tox".format(project_id)

    with beam.Pipeline(options=pipeline_options) as p:

        # We first read from Pub/Sub
        # Because it's a streaming pipeline, we need to apply a window for the join
        # Finally we key the data so we can join it back after the A/B test
        # TODO: Follow Step 3: Create the pipeline to read from the input topic
        # TODO: Follow Step 4: Window the incoming element
        # TODO: Follow Step 5: Tag your element with the key

        # Load the model into a handler
        # We use KeyedModelHandler here to automatically handle the incoming keys
        # It also returns the key so you can preserve the key and use it after the prediction
        # TODO: Follow Step 6: Create the model handler

        # Use the handler to perform inference
        # Note that the gaming toxicity score is based on "toxic or not"
        # The scale differs from the movie model
        # TODO: Follow Step 7: Submit the input to the model for a result

        # Flag the values so we can determine if toxic or not
        # TODO: Apply the correct DoFn from above as instructed in Step 8: Parse your results from the prediction
        nice_or_not = (
            gaming_inference
            | beam.ParDo( # Put the right DoFn here )
        )