in components/specialized-parser/src/parser_main.py [0:0]
def run() -> None:
# required params via environment variables
print("Reading environment variables for configuration")
print(f"{os.environ=}")
processor_id = os.environ["PROCESSOR_ID"]
gcs_input_prefix = os.environ["GCS_INPUT_PREFIX"]
gcs_output_uri = os.environ["GCS_OUTPUT_URI"]
bigquery_metadata_table = os.environ["BQ_TABLE"]
valid_processor_tuple = is_valid_processor_id(processor_id)
if not valid_processor_tuple:
raise ValueError(f"processor_id is missing or invalid. {processor_id=}")
job_config = JobConfig(
run_id=os.environ.get("RUN_ID", "no-run-id-specified"),
gcs_input_prefix=gcs_input_prefix,
gcs_output_uri=gcs_output_uri,
)
processor_config = ProcessorConfig(
project=valid_processor_tuple[0],
location=valid_processor_tuple[1],
processor_id=valid_processor_tuple[2],
timeout=int(os.environ.get("PROCESSOR_TIMEOUT", "600")),
)
bigquery_config = BigQueryConfig(
general_output_table_id=bigquery_metadata_table,
)
alloydb_config = AlloyDBConfig(
# alloydb primary instance is set by terraform, and already in the form of:
# "projects/<PROJECT>/locations/<LOCATION>/clusters/<CLUSTER>/instances/<INSTANCE>"
# If you override the environment variable, make sure to use the same format.
primary_instance=os.environ["ALLOYDB_INSTANCE"],
database=os.environ["ALLOYDB_DATABASE"],
user=os.environ["ALLOYDB_USER"],
)
runner = SpecializedParserJobRunner(
job_config=job_config,
alloydb_config=alloydb_config,
processor_config=processor_config,
bigquery_config=bigquery_config,
)
runner.run()