def run_validation()

in data_validation/__main__.py [0:0]


def run_validation(config_manager: ConfigManager, dry_run=False, verbose=False):
    """Run a single validation.

    Args:
        config_manager (ConfigManager): Validation config manager instance.
        dry_run (bool): Print source and target SQL to stdout in lieu of validation.
        verbose (bool): Validation setting to log queries run.
    """
    # Only use cached connection for SQLAlchemy backends that manage reconnects for us.
    source_client = (
        config_manager.source_client
        if clients.is_sqlalchemy_backend(config_manager.source_client)
        else None
    )
    target_client = (
        config_manager.target_client
        if clients.is_sqlalchemy_backend(config_manager.target_client)
        else None
    )
    with DataValidation(
        config_manager.config,
        validation_builder=None,
        result_handler=None,
        verbose=verbose,
        source_client=source_client,
        target_client=target_client,
    ) as validator:

        if dry_run:
            print(
                json.dumps(
                    {
                        "source_query": util.ibis_table_to_sql(
                            validator.validation_builder.get_source_query()
                        ),
                        "target_query": util.ibis_table_to_sql(
                            validator.validation_builder.get_target_query()
                        ),
                    },
                    indent=4,
                )
            )
        else:
            validator.execute()