dialogflow-cx/vpc-sc-demo/backend/analytics_utilities.py (105 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module for registering user analytics.""" import datetime import hashlib import json import logging import os import uuid from enum import Enum import asset_utilities as asu import get_token import jsonschema import session_utilities as su from google.cloud import bigquery logger = logging.getLogger(__name__) _INACTIVITY_TIMEOUT = {"hours": 8} SCHEMA = { "type": "object", "properties": { "visit_id": {"type": ["string", "null"], "minLength": 32, "maxLength": 32}, "session_id": {"type": ["string", "null"], "minLength": 32, "maxLength": 32}, "user_hash": {"type": ["string", "null"], "minLength": 32, "maxLength": 32}, "action": {"type": "integer", "minimum": 0, "maximum": 5}, "timestamp": {"type": "integer", "minimum": 1672435955}, "action_data": { "type": "object", "properties": { "current_page": {"type": "string"}, "current_tab": {"type": "integer", "minimum": 0, "maximum": 4}, "service": {"type": "string"}, "targets": {"type": "array"}, }, "additionalProperties": False, }, }, "additionalProperties": False, "required": [ "visit_id", "session_id", "user_hash", "action", "timestamp", "action_data", ], } class ACTIONS(Enum): """Actions that a user can take that are logged in the Analytics database.""" FRONTEND = 0 SET_ACTIVE_PAGE = 1 SET_ACTIVE_TUTORIAL_TAB = 2 UPDATE_STATUS = 3 ASSET_STATUS = 4 ASSET_UPDATE = 5 def update_visit_cookie(request, response): """Update the visit_id cookie to a new expire time.""" visit_id = request.cookies.get("visit_id", uuid.uuid4().hex) response.set_cookie( "visit_id", value=visit_id, secure=True, httponly=True, domain=su.user_service_domain(request), expires=datetime.datetime.now() + datetime.timedelta(**_INACTIVITY_TIMEOUT), ) return response def validate_data(instance): """Validate data, with logging if there is a problem.""" try: jsonschema.validate(instance=instance, schema=SCHEMA) instance["action_data"] = json.dumps(instance["action_data"]) except jsonschema.exceptions.ValidationError as exc: logging.error("Validation error: %s", exc) def register_action(request, response, action, data=None): """Register a user action with the analytics database.""" target_database = "prod" if su.is_prod() else "dev" if data is None: data = {} session_id = request.cookies.get("session_id") if session_id: tok_response = get_token.get_token(request, token_type="email") if "response" in tok_response: logging.error( "Error retriving user information: %s", tok_response["response"] ) response.delete_cookie("session_id", domain=su.user_service_domain(request)) response.delete_cookie( "user_logged_in", domain=su.user_service_domain(request) ) user_hash = None else: user_hash = hashlib.md5(tok_response["email"].encode("utf-8")).hexdigest() else: user_hash = None instance = { "visit_id": request.cookies.get("visit_id"), "session_id": session_id, "user_hash": user_hash, "action": action.value, "timestamp": int(datetime.datetime.now().timestamp()), "action_data": data, } validate_data(instance) client = bigquery.Client(credentials=asu.get_credentials()) table = client.get_table( client.dataset(os.environ["ANALYTICS_DATABASE"]).table(target_database) ) insert_response = client.insert_rows_json(json_rows=[instance], table=table) if insert_response: logging.error("Error inserting into analytics database: %s", insert_response) update_visit_cookie(request, response) return response