in src/dfcx_scrapi/core/agents.py [0:0]
def modify_bq_settings_kwargs(
self,
agent_id: str,
bq_settings: Dict[str, str],
enable_stackdriver_logging: bool = None,
enable_interaction_logging: bool = None,
bigquery_enabled: bool = None,
bigquery_table: str = None
):
"""Map the input kwargs from the method to the bq_settings payload."""
# Update each of the kwargs as provided
if enable_stackdriver_logging is not None:
bq_settings["advanced_settings"]["logging_settings"][
"enable_stackdriver_logging"] = enable_stackdriver_logging
if enable_interaction_logging is not None:
bq_settings["advanced_settings"]["logging_settings"][
"enable_interaction_logging"] = enable_interaction_logging
if bigquery_enabled is not None:
bq_settings["bigquery_export_settings"][
"enabled"] = bigquery_enabled
if bigquery_table is not None:
if not self.bq_table_map:
self.bq_table_map = self.get_bq_dataset_table_map(agent_id)
table_id = self.bq_table_map.get(bigquery_table, None)
if not table_id:
raise KeyError(
f"Table `{bigquery_table}` does not exist in project.")
bq_settings["bigquery_export_settings"]["bigquery_table"] = table_id
return bq_settings