in managed-connectivity/sample-custom-connector/src/bootstrap.py [0:0]
def run():
"""Runs a pipeline."""
config = cmd_reader.read_args()
config["password"] = secret_manager.get_password(config["password_secret"])
connector = OracleConnector(config)
with open(FILENAME, "w", encoding="utf-8") as file:
# Write top entries that don't require connection to the database
file.writelines(top_entry_builder.create(config, EntryType.INSTANCE))
file.writelines("\n")
file.writelines(top_entry_builder.create(config, EntryType.DATABASE))
# Get schemas, write them and collect to the list
df_raw_schemas = connector.get_db_schemas()
schemas = [schema.USERNAME for schema in df_raw_schemas.select("USERNAME").collect()]
schemas_json = entry_builder.build_schemas(config, df_raw_schemas).toJSON().collect()
write_jsonl(file, schemas_json)
# Ingest tables and views for every schema in a list
for schema in schemas:
print(f"Processing tables for {schema}")
tables_json = process_dataset(connector, config, schema, EntryType.TABLE)
write_jsonl(file, tables_json)
print(f"Processing views for {schema}")
views_json = process_dataset(connector, config, schema, EntryType.VIEW)
write_jsonl(file, views_json)
gcs_uploader.upload(config, FILENAME)