demo_deployment/firestore_crud.py (60 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse, logging
from google.cloud import firestore
DEFAULT_DATABASE_NAME = "(default)"
WORKFLOWS_COLLECTION_DEFAULT_NAME = "workflows_scheduling"
def main(args, loglevel):
logging.basicConfig(format="%(levelname)s: %(message)s", level=loglevel)
db = firestore.Client(project=args.gcp_project, database=DEFAULT_DATABASE_NAME)
if hasattr(args, 'workflow_properties'):
print(args.workflow_properties)
#workflow_props = json.loads(args.workflow_properties)
workflow_props = args.workflow_properties
if args.operation_type in('CREATE','UPDATE'):
data = {
"workflows_name": args.workflow_name,
"crond_expression": args.crond_expression,
"time_zone": args.time_zone,
"date_format": args.date_format,
"workflow_status": args.workflow_status,
"workflow_properties": workflow_props
}
if args.operation_type == 'CREATE':
create_doc(db, data)
if args.operation_type == 'UPDATE':
update_doc(db,data)
if args.operation_type == 'DELETE':
delete_doc(db)
def create_doc(db, data):
db.collection(WORKFLOWS_COLLECTION_DEFAULT_NAME).document(args.workflow_name).set(data)
print_documents(db)
def update_doc(db,data):
db.collection(WORKFLOWS_COLLECTION_DEFAULT_NAME).document(args.workflow_name).update(data)
print_documents(db)
def delete_doc(db):
db.collection(WORKFLOWS_COLLECTION_DEFAULT_NAME).document(args.workflow_name).delete()
print_documents(db)
def print_documents(db):
doc_ref = db.collection(WORKFLOWS_COLLECTION_DEFAULT_NAME)
docs = doc_ref.stream()
for doc in docs:
print(f"{doc.id} => {doc.to_dict()}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description = "Crud utility for firestore.",
fromfile_prefix_chars = '@' )
parser.add_argument("--operation_type",help="can be 'CREATE','UPDATE' or 'DELETE' ", required=True)
parser.add_argument("--gcp_project",help="gco project containing the workflows_scheduling collection in firestore", required=True)
parser.add_argument("--workflow_name",help="workflow name as named in cloud workflows service", required=True)
args, unknown = parser.parse_known_args()
if args.operation_type in ('CREATE','UPDATE'):
parser.add_argument("--crond_expression",help="crond expression to schedlue the workflows. (eg. '0 7 * * *')", required=True)
parser.add_argument("--time_zone",help="time zone associated with crond expression. (eg. 'America/Los_Angeles')", required=True)
parser.add_argument("--date_format",help="python date format to be passed to the workflow execution (eg. '%Y-%m-%d')", required=True)
parser.add_argument("--workflow_status",help="workflow status can be 'ENABLED' or 'DISABLED' to disable a workflow execution temporarily", required=True)
parser.add_argument("--workflow_properties",help="properties to be passed as workflow input (eg. '{\"database_project_id\":\"prj-111\"}')", default='{}')
parser.add_argument("-v","--verbose",help="increase output verbosity", action="store_true")
args, unknown = parser.parse_known_args()
# Setup logging
if args.verbose:
loglevel = logging.DEBUG
else:
loglevel = logging.INFO
main(args, loglevel)