data-analytics/beam_ml_toxicity_in_gaming/toxicity_pipeline.py (88 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import tensorflow as tf from tensorflow import keras import tensorflow_text import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor from apache_beam.ml.inference.base import PredictionResult from apache_beam.ml.inference.base import RunInference from apache_beam.ml.inference.base import KeyedModelHandler import argparse class tag_with_key(beam.DoFn): # In this pardo, we key our elements using the attributes of the message def process(self, element): yield (element.attributes["userid"],(element.data).decode('UTF-8')) class flag_for_toxic(beam.DoFn): def process(self, element): # Parsing the output of the inference # We need to pull out the tensor and conver it to numpy # Note: for sake of brevity, we've used a hardcoded method # In production and for good practice you'll want to use the PredictionResult object tox_level = element[1][1].numpy().item() # We've put an arbitrary value to determine toxicity # This value is something you'll need to align with the model # The arbitrary value is just for demonstration purposes if tox_level > -0.5: yield ("not",element) else: yield ("nice",element) def run(project_id, gaming_model_location, movie_model_location, pipeline_args): pipeline_options = PipelineOptions( pipeline_args, save_main_session=True) # We are using a topic for input # Pub/Sub IO will automatically create a subscription for us input_topic = "projects/{}/topics/tox-input".format(project_id) output_topic = "projects/{}/topics/tox-output".format(project_id) output_bigquery = "{}:demo.tox".format(project_id) with beam.Pipeline(options=pipeline_options) as p: # We first read from Pub/Sub # Because it's a streaming pipeline, we need to apply a window for the join # Finally we key the data so we can join it back after the A/B test read_from_pubsub = ( p | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=input_topic,with_attributes=True) # In this particular example, we aren't worried about an accurate window # If uniqueness is an issue, we can switch to using message ID of each message # The message ID will be unique and will ensure uniqueness | "Window data" >> beam.WindowInto(beam.window.FixedWindows(0.1)) | "Key up input" >> beam.ParDo(tag_with_key()) ) # Load the model into a handler # We use KeyedModelHandler here to automatically handle the incoming keys # It also returns the key so you can preserve the key and use it after the prediction gaming_model_handler = KeyedModelHandler(TFModelHandlerTensor(model_uri=gaming_model_location,load_model_args={"compile":False})) # Use the handler to perform inference # Note that the gaming toxicity score is based on "toxic or not" # The scale differs from the movie model gaming_inference = ( read_from_pubsub | "Perform gaming inference" >> RunInference(gaming_model_handler) ) # Flag the values so we can determine if toxic or not nice_or_not = ( gaming_inference | beam.ParDo(flag_for_toxic()) ) # Print to screen so we can see the results nice_or_not | beam.Map(print) # Filter, if toxic then write to Pub/Sub # "Not" denotes not nice not_filter = nice_or_not | beam.Filter(lambda outcome: outcome[0] == "not") # Write to Pub/Sub _ = (not_filter | "Convert to bytestring" >> beam.Map(lambda element: bytes(str(element[1]),"UTF-8")) | beam.io.WriteToPubSub(topic=output_topic) ) # Load the model into a handler movie_model_handler = KeyedModelHandler(TFModelHandlerTensor(model_uri=movie_model_location,load_model_args={"compile":False})) # Note that the movie score differ in scoring # "negative" would mean negative values # "postivie" would mean positive values # Use the handler to perform inference movie_inference = ( read_from_pubsub | "Perform movie inference" >> RunInference(movie_model_handler) ) # We join up the data so we can compare the values later joined = ( ({'gaming': gaming_inference, 'movie': movie_inference}) | 'Join' >> beam.CoGroupByKey() ) # Simple string schema - normally not recommended # For brevity sake, we convert to a single string data_schema = {'fields': [ {'name': 'data_col', 'type': 'STRING', 'mode': 'NULLABLE'}]} # Write to BigQuery # We're converting to the simple string to insert output_to_bq = ( joined | "Convert to string" >> beam.Map(lambda element: {"data_col":str(element)}) | beam.io.gcp.bigquery.WriteToBigQuery( method=beam.io.gcp.bigquery.WriteToBigQuery.Method.STREAMING_INSERTS, table=output_bigquery, schema=data_schema ) ) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--project_id', dest='project_id', required=True, help=('project id')) parser.add_argument( '--gaming', dest='gaming_loc', required=True, help=('location of gaming model')) parser.add_argument( '--movie', dest='movie_loc', required=True, help=('location of movie model')) known_args, pipeline_args = parser.parse_known_args() run(known_args.project_id, known_args.gaming_loc, known_args.movie_loc, pipeline_args)