components/processing/tasks.py (88 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from invoke import task # Find the base directory for invoke BASE_DIR = os.path.dirname(__file__) ROOT_DIR = os.path.join(BASE_DIR, "../../") @task def cloud_run_remote_build(c): """Document Processor builder with Cloud Build (Cloud Run) This will build and deploy a new instance of Cloud Run using Cloud Build. """ with c.cd(ROOT_DIR): c.run( "gcloud builds submit " f"--region {os.getenv('REGION')} " f"--project {os.getenv('PROJECT_ID')} " '--config "components/processing/terraform/build/cloudbuild.yaml" .', pty=True, ) @task def cloud_run_local_deploy(c): """Document Processor local deployer (Cloud Run) This will deploy a new instance of Cloud Run jobs from the local codebase. This will use the local docker and gcloud commands to deploy. """ repo = ( f"{os.getenv('REPOSITORY_REGION')}-docker.pkg.dev/" f"{os.getenv('PROJECT_ID')}/{os.getenv('ARTIFACT_REPO_NAME')}" ) job_name = os.getenv("PROCESSING_CLOUD_RUN_JOB_NAME") image = f"{repo}/{job_name}:latest" with c.cd(BASE_DIR): c.run( f"docker buildx build " "--push " f"--build-context libs=libs " f"--build-context reqs={ROOT_DIR}/reqs " f"-t {image} " f"terraform/build", pty=True, ) c.run( f"gcloud " f"--project {os.getenv('PROJECT_ID')} " f"run jobs " f"--region {os.getenv('REGION')} " f"deploy {job_name} " f"--task-timeout=60m " f"--service-account={os.getenv('SERVICE_ACCOUNT')} " f"--image={image}" ) @task( help={ "process-dir": "Folder of input objects to process", }, ) def cloud_run_execute( c, process_dir, reject_dir, with_html=True, write_json=True, write_bigquery="" ): """Document Processor execute (Cloud Run) This will execute a previously deployed Cloud Run Document Processor instance with the specified parameters. It can only operate on GCS objects, but will run in a Cloud run environment. """ with c.cd(BASE_DIR): c.run( "gcloud run jobs " f"--region {os.getenv('REGION')} " f"--project {os.getenv('PROJECT_ID')} " f"execute doc-processor " f"--args {process_dir}," f"--reject_dir={reject_dir}," f"--with_html={with_html}," f"--write_json={write_json}," f"--write_bigquery={write_bigquery}" ) @task( help={ "process-dir": "Folder of input objects to process", }, ) def process( c, process_dir, reject_dir, with_html=True, write_json=True, write_bigquery="", debug=True, ): """Document Processor (development) This will apply the Document Processor to the specified processor and run on the development machine. It will operate on GCS or local files as specified. """ import logging from processors.base.gcsio import GCSPath from processors.msg.main_processor import process_all_objects logging.basicConfig(format="%(asctime)s %(name)s: %(message)s") logging.getLogger("processors").setLevel(logging.DEBUG if debug else logging.INFO) # Process everything process_all_objects( GCSPath(process_dir), GCSPath(reject_dir), with_html=with_html, write_json=write_json, write_bigquery=write_bigquery, )