perfkitbenchmarker/providers/gcp/gcp_spanner.py (506 lines of code) (raw):
# Copyright 2017 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module containing class for GCP's spanner instances.
Instances can be created and deleted. Providing an instance ID and database ID
causes the instance to be user_managed, meaning PKB will not manage any
lifecycle for the instance.
"""
import dataclasses
import datetime
import json
import logging
import statistics
from typing import Any, Dict
from absl import flags
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query
import numpy as np
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import errors
from perfkitbenchmarker import relational_db
from perfkitbenchmarker import relational_db_spec
from perfkitbenchmarker import sql_engine_utils
from perfkitbenchmarker.configs import option_decoders
from perfkitbenchmarker.configs import spec
from perfkitbenchmarker.providers.gcp import util
import requests
FLAGS = flags.FLAGS
flags.DEFINE_string(
'cloud_spanner_config',
None,
'The config for the Cloud Spanner instance. Use default config if unset.',
)
flags.DEFINE_integer(
'cloud_spanner_nodes',
None,
'The number of nodes for the Cloud Spanner instance.',
)
_LOAD_NODES = flags.DEFINE_integer(
'cloud_spanner_load_nodes',
None,
'The number of nodes for the Cloud Spanner instance to use for the load'
' phase. Assumes that the benchmark calls UpdateRunCapacity to set the '
' correct node count manually before the run phase.',
)
flags.DEFINE_string(
'cloud_spanner_project',
None,
'The project for the Cloud Spanner instance. Use default project if unset.',
)
flags.DEFINE_string(
'cloud_spanner_instance',
None,
'The name of the static Cloud Spanner instance. New '
'instance created if unset.',
)
flags.DEFINE_string(
'cloud_spanner_database',
None,
'The name of the static Cloud Spanner database. New '
'database created if unset. cloud_spanner_instance flag is '
'mandatory to use this flag.',
)
# Flags related to managed autoscaler
# https://cloud.google.com/spanner/docs/create-manage-instances#gcloud
flags.DEFINE_boolean(
'cloud_spanner_autoscaler',
False,
'Turn on the autoscaler for the spanner instance.',
)
flags.DEFINE_integer(
'cloud_spanner_autoscaler_min_processing_units',
None,
'Minimum number of processing units. Autoscaler will not go lower than'
' this.',
)
flags.DEFINE_integer(
'cloud_spanner_autoscaler_max_processing_units',
None,
'Maximum number of processing units. Autoscaler will not go higher than'
' this.',
)
flags.DEFINE_integer(
'cloud_spanner_autoscaler_high_priority_cpu_target',
None,
'The CPU usage when spanner starts scaling.',
)
flags.DEFINE_integer(
'cloud_spanner_autoscaler_storage_target',
None,
'Storage target when spanner starts scaling.',
)
# Type aliases
_RelationalDbSpec = relational_db_spec.RelationalDbSpec
_DEFAULT_REGION = 'us-central1'
_DEFAULT_DESCRIPTION = 'Created by PKB.'
_DEFAULT_ENDPOINT = 'https://spanner.googleapis.com'
_DEFAULT_NODES = 1
_FROZEN_NODE_COUNT = 1
_DEFAULT_MIN_PROCESSING_UNITS = 5000
_DEFAULT_MAX_PROCESSING_UNITS = 50000
_DEFAULT_HIGH_PRIORITY_CPU_TARGET = 65
_DEFAULT_STORAGE_TARGET = 95
# Dialect options
GOOGLESQL = 'GOOGLE_STANDARD_SQL'
POSTGRESQL = 'POSTGRESQL'
_VALID_DIALECTS = frozenset([GOOGLESQL, POSTGRESQL])
# Common decoder configuration option.
_NONE_OK = {'default': None, 'none_ok': True}
# Spanner CPU Monitoring API has a 3 minute delay. See
# https://cloud.google.com/monitoring/api/metrics_gcp#gcp-spanner,
CPU_API_DELAY_MINUTES = 3
CPU_API_DELAY_SECONDS = CPU_API_DELAY_MINUTES * 60
# For more information on QPS expectations, see
# https://cloud.google.com/spanner/docs/performance#typical-workloads
_READ_OPS_PER_NODE = 10000
_WRITE_OPS_PER_NODE = 2000
_ADJUSTED_READ_OPS_PER_NODE = 15000
_ADJUSTED_WRITE_OPS_PER_NODE = 3000
_ADJUSTED_CONFIG_REGIONS = ['regional-us-east4']
@dataclasses.dataclass
class SpannerSpec(relational_db_spec.RelationalDbSpec):
"""Configurable options of a Spanner instance."""
SERVICE_TYPE = 'spanner'
spanner_instance_id: str
spanner_database_id: str
spanner_description: str
spanner_config: str
spanner_nodes: int
spanner_load_nodes: int
spanner_project: str
spanner_autoscaler: bool
spanner_min_processing_units: int
spanner_max_processing_units: int
spanner_high_priority_cpu_target: int
spanner_storage_target: int
def __init__(
self,
component_full_name: str,
flag_values: flags.FlagValues | None = None,
**kwargs,
):
super().__init__(component_full_name, flag_values=flag_values, **kwargs)
@classmethod
def _GetOptionDecoderConstructions(cls) -> dict[str, Any]:
"""Gets decoder classes and constructor args for each configurable option.
Returns:
dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair.
The pair specifies a decoder class and its __init__() keyword arguments
to construct in order to decode the named option.
"""
result = super()._GetOptionDecoderConstructions()
result.update({
'spanner_instance_id': (option_decoders.StringDecoder, _NONE_OK),
# Base class supplies a database_name, but this is easier to find.
'spanner_database_id': (option_decoders.StringDecoder, _NONE_OK),
'spanner_description': (option_decoders.StringDecoder, _NONE_OK),
'spanner_config': (option_decoders.StringDecoder, _NONE_OK),
'spanner_nodes': (option_decoders.IntDecoder, _NONE_OK),
'spanner_load_nodes': (option_decoders.IntDecoder, _NONE_OK),
'spanner_project': (option_decoders.StringDecoder, _NONE_OK),
'db_spec': (spec.PerCloudConfigDecoder, _NONE_OK),
'db_disk_spec': (spec.PerCloudConfigDecoder, _NONE_OK),
'spanner_autoscaler': (option_decoders.BooleanDecoder, _NONE_OK),
'spanner_min_processing_units': (option_decoders.IntDecoder, _NONE_OK),
'spanner_max_processing_units': (option_decoders.IntDecoder, _NONE_OK),
'spanner_high_priority_cpu_target': (
option_decoders.IntDecoder,
_NONE_OK,
),
'spanner_storage_target': (option_decoders.IntDecoder, _NONE_OK),
})
return result
@classmethod
def _ApplyFlags(
cls, config_values: dict[str, Any], flag_values: flags.FlagValues
) -> None:
"""Modifies config options based on runtime flag values.
Can be overridden by derived classes to add support for specific flags.
Args:
config_values: dict mapping config option names to provided values. May be
modified by this function.
flag_values: flags.FlagValues. Runtime flags that may override the
provided config values.
"""
super()._ApplyFlags(config_values, flag_values)
if flag_values['cloud_spanner_instance'].present:
config_values['spanner_instance_id'] = flag_values.cloud_spanner_instance
if flag_values['cloud_spanner_database'].present:
config_values['spanner_database_id'] = flag_values.cloud_spanner_database
if flag_values['cloud_spanner_config'].present:
config_values['spanner_config'] = flag_values.cloud_spanner_config
if flag_values['cloud_spanner_nodes'].present:
config_values['spanner_nodes'] = flag_values.cloud_spanner_nodes
if flag_values['cloud_spanner_load_nodes'].present:
config_values['spanner_load_nodes'] = flag_values.cloud_spanner_load_nodes
if flag_values['cloud_spanner_project'].present:
config_values['spanner_project'] = flag_values.cloud_spanner_project
if flag_values['cloud_spanner_autoscaler'].present:
config_values['spanner_autoscaler'] = flag_values.cloud_spanner_autoscaler
if flag_values['cloud_spanner_autoscaler_min_processing_units'].present:
config_values['spanner_min_processing_units'] = (
flag_values.cloud_spanner_autoscaler_min_processing_units
)
if flag_values['cloud_spanner_autoscaler_max_processing_units'].present:
config_values['spanner_max_processing_units'] = (
flag_values.cloud_spanner_autoscaler_max_processing_units
)
if flag_values['cloud_spanner_autoscaler_high_priority_cpu_target'].present:
config_values['spanner_high_priority_cpu_target'] = (
flag_values.cloud_spanner_autoscaler_high_priority_cpu_target
)
if flag_values['cloud_spanner_autoscaler_storage_target'].present:
config_values['spanner_storage_target'] = (
flag_values.cloud_spanner_autoscaler_storage_target
)
class GcpSpannerInstance(relational_db.BaseRelationalDb):
"""Object representing a GCP Spanner Instance.
The project and Cloud Spanner config must already exist. Instance and database
will be created and torn down before and after the test.
The following parameters are overridden by the corresponding FLAGs.
project: FLAGS.cloud_spanner_project
config: FLAGS.cloud_spanner_config
nodes: FLAGS.cloud_spanner_nodes
Attributes:
name: Name of the instance to create.
description: Description of the instance.
database: Name of the database to create
"""
CLOUD = 'GCP'
IS_MANAGED = True
REQUIRED_ATTRS = ['CLOUD', 'IS_MANAGED', 'ENGINE']
def __init__(self, db_spec: SpannerSpec, **kwargs):
super().__init__(db_spec, **kwargs)
self.instance_id = (
db_spec.spanner_instance_id or f'pkb-instance-{FLAGS.run_uri}'
)
self.database = (
db_spec.spanner_database_id or f'pkb-database-{FLAGS.run_uri}'
)
self._description = db_spec.spanner_description or _DEFAULT_DESCRIPTION
self._config = db_spec.spanner_config or self._GetDefaultConfig()
self.nodes = db_spec.spanner_nodes or _DEFAULT_NODES
self._load_nodes = db_spec.spanner_load_nodes or self.nodes
self._api_endpoint = None
self._autoscaler = db_spec.spanner_autoscaler
self._min_processing_units = (
db_spec.spanner_min_processing_units or _DEFAULT_MIN_PROCESSING_UNITS
)
self._max_processing_units = (
db_spec.spanner_max_processing_units or _DEFAULT_MAX_PROCESSING_UNITS
)
self._high_priority_cpu_target = (
db_spec.spanner_high_priority_cpu_target
or _DEFAULT_HIGH_PRIORITY_CPU_TARGET
)
self._storage_target = (
db_spec.spanner_storage_target or _DEFAULT_STORAGE_TARGET
)
# Cloud Spanner may not explicitly set the following common flags.
self.project = (
db_spec.spanner_project or FLAGS.project or util.GetDefaultProject()
)
def _GetDefaultConfig(self) -> str:
"""Gets the config that corresponds the region used for the test."""
try:
region = util.GetRegionFromZone(FLAGS.zone[0])
except IndexError:
region = _DEFAULT_REGION
return f'regional-{region}'
@staticmethod
def GetDefaultEngineVersion(engine: str) -> str:
return 'default'
def _Create(self) -> None:
"""Creates the instance, the database, and update the schema."""
cmd = util.GcloudCommand(
self, 'spanner', 'instances', 'create', self.instance_id
)
cmd.flags['description'] = self._description
cmd.flags['config'] = self._config
if self.spec.db_tier:
cmd.flags['edition'] = self.spec.db_tier
if self._autoscaler:
cmd.use_beta_gcloud = True
cmd.flags['autoscaling-min-processing-units'] = self._min_processing_units
cmd.flags['autoscaling-max-processing-units'] = self._max_processing_units
cmd.flags['autoscaling-high-priority-cpu-target'] = (
self._high_priority_cpu_target
)
cmd.flags['autoscaling-storage-target'] = self._storage_target
else:
cmd.flags['nodes'] = self.nodes
_, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
# TODO(user) Currently loops if the database doesn't exist. To fix
# this we should move waiting for the database to be ready from Exists
# to this function.
if self._Exists():
logging.info(
'Found an existing instance, setting user_managed to True.'
)
self.user_managed = True
else:
logging.error('Create GCP Spanner instance failed.')
return
self._UpdateLabels(util.GetDefaultTags())
self.CreateDatabase(self.database)
def CreateTables(self, ddl: str) -> None:
"""Creates the tables specified by the DDL.
If the table already exists, this is a no-op.
Args:
ddl: The DDL statement used to create the table schema.
Raises:
errors.Benchmarks.RunError if updating the DDL fails.
"""
cmd = util.GcloudCommand(
self, 'spanner', 'databases', 'ddl', 'update', self.database
)
cmd.flags['instance'] = self.instance_id
cmd.flags['ddl'] = ddl
_, stderr, retcode = cmd.Issue(raise_on_failure=False)
if 'Duplicate name in schema' in stderr:
logging.info('Table already exists, skipping.')
return
if retcode != 0:
raise errors.Benchmarks.RunError('Failed to update GCP Spanner DDL.')
def _Delete(self) -> None:
"""Deletes the instance."""
cmd = util.GcloudCommand(
self, 'spanner', 'instances', 'delete', self.instance_id
)
_, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.error('Delete GCP Spanner instance failed.')
else:
logging.info('Deleted GCP Spanner instance.')
def CreateDatabase(self, database_name: str) -> tuple[str, str]:
"""Creates the database."""
cmd = util.GcloudCommand(
self, 'spanner', 'databases', 'create', database_name
)
cmd.flags['instance'] = self.instance_id
cmd.flags['database-dialect'] = self.dialect
stdout, stderr, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.error('Create GCP Spanner database failed.')
return stdout, stderr
def DeleteDatabase(self, database_name: str) -> tuple[str, str]:
"""Deletes the database."""
cmd = util.GcloudCommand(
self, 'spanner', 'databases', 'delete', database_name
)
cmd.flags['instance'] = self.instance_id
stdout, stderr, _ = cmd.Issue(raise_on_failure=False)
return stdout, stderr
def _Exists(self, instance_only: bool = False) -> bool:
"""Returns true if the instance and the database exists."""
_, retcode = self._DescribeInstance(raise_on_failure=False)
if retcode != 0:
logging.info('Could not find GCP Spanner instance %s.', self.instance_id)
return False
if instance_only:
return True
cmd = util.GcloudCommand(
self, 'spanner', 'databases', 'describe', self.database
)
cmd.flags['instance'] = self.instance_id
# Do not log error or warning when checking existence.
stdout, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.info('Could not find GCP Spanner database %s.', self.database)
return False
elif json.loads(stdout)['state'] != 'READY':
logging.info('Waiting for database to be ready.')
return False
return True
def GetApiEndPoint(self) -> str:
"""Returns the API endpoint override for Cloud Spanner."""
if self._api_endpoint:
return self._api_endpoint
cmd = util.GcloudCommand(
self, 'config', 'get-value', 'api_endpoint_overrides/spanner'
)
stdout, _, retcode = cmd.Issue(raise_on_failure=False)
if retcode != 0:
logging.warning('Fail to retrieve cloud spanner end point.')
self._api_endpoint = json.loads(stdout) or _DEFAULT_ENDPOINT
return self._api_endpoint
def _WaitUntilInstanceReady(self) -> None:
"""Waits until the instance is ready."""
# TODO(user): Refactor Spanner instance to use this method in Create
while True:
instance, _ = self._DescribeInstance()
if json.loads(instance)['state'] == 'READY':
break
def _SetNodes(self, nodes: int) -> None:
"""Sets the number of nodes on the Spanner instance."""
# Not yet supported for user managed instances since instance attributes
# are not discovered.
if self.user_managed:
return
# Cannot set nodes with autoscaler.
if self._autoscaler:
return
current_nodes = self._GetNodes()
if nodes == current_nodes:
return
logging.info('Updating node count from %s to %s.', current_nodes, nodes)
cmd = util.GcloudCommand(
self, 'spanner', 'instances', 'update', self.instance_id
)
cmd.flags['nodes'] = nodes
cmd.Issue(raise_on_failure=True)
self._WaitUntilInstanceReady()
def UpdateCapacityForLoad(self) -> None:
"""See base class."""
self._SetNodes(self._load_nodes)
def UpdateCapacityForRun(self) -> None:
"""See base class."""
self._SetNodes(self.nodes)
def _Restore(self) -> None:
"""See base class.
Increases the number of nodes on the instance to the specified number. See
https://cloud.google.com/spanner/pricing for Spanner pricing info.
"""
self._SetNodes(self.nodes)
def _Freeze(self) -> None:
"""See base class.
Lowers the number of nodes on the instance to one. Note there are
restrictions to being able to lower the number of nodes on an instance. See
https://cloud.google.com/spanner/docs/create-manage-instances.
"""
self._SetNodes(_FROZEN_NODE_COUNT)
def _DescribeInstance(self, raise_on_failure=True) -> tuple[str, int]:
"""Returns `spanner instances describe` output for this instance."""
cmd = util.GcloudCommand(
self, 'spanner', 'instances', 'describe', self.instance_id
)
stdout, _, retcode = cmd.Issue(raise_on_failure=raise_on_failure)
return stdout, retcode
def _GetLabels(self) -> dict[str, Any]:
"""Gets labels from the current instance."""
return json.loads(self._DescribeInstance()[0]).get('labels', {})
def _GetNodes(self) -> int:
"""Gets node count from the current instance."""
return json.loads(self._DescribeInstance()[0])['nodeCount']
def _UpdateLabels(self, labels: Dict[str, Any]) -> None:
"""Updates the labels of the current instance."""
header = {'Authorization': f'Bearer {util.GetAccessToken()}'}
url = (
f'{self.GetApiEndPoint()}/v1/projects/'
f'{self.project}/instances/{self.instance_id}'
)
# Keep any existing labels
tags = self._GetLabels()
tags.update(labels)
args = {
'instance': {'labels': tags},
'fieldMask': 'labels',
}
response = requests.patch(url, headers=header, json=args)
logging.info(
'Update labels: status code %s, %s', response.status_code, response.text
)
if response.status_code != 200:
raise errors.Resource.UpdateError(
f'Unable to update Spanner instance: {response.text}'
)
def _UpdateTimeout(self, timeout_minutes: int) -> None:
"""See base class."""
labels = util.GetDefaultTags(timeout_minutes)
self._UpdateLabels(labels)
def GetResourceMetadata(self) -> Dict[Any, Any]:
"""Returns useful metadata about the instance."""
metadata = {
'gcp_spanner_name': self.instance_id,
'gcp_spanner_database': self.database,
'gcp_spanner_database_dialect': self.dialect,
'gcp_spanner_config': self._config,
'gcp_spanner_endpoint': self.GetApiEndPoint(),
'gcp_spanner_load_node_count': self._load_nodes,
}
if self._autoscaler:
metadata.update({
'gcp_spanner_autoscaler': self._autoscaler,
'gcp_spanner_min_processing_units': self._min_processing_units,
'gcp_spanner_max_processing_units': self._max_processing_units,
'gcp_spanner_high_priority_cpu_target': (
self._high_priority_cpu_target
),
'gcp_spanner_storage_target': self._storage_target,
})
else:
metadata['gcp_spanner_node_count'] = self.nodes
return metadata
def GetAverageCpuUsage(
self, duration_minutes: int, end_time: datetime.datetime
) -> float:
"""Gets the average high priority CPU usage through the time duration.
Note that there is a delay for the API to get data, so this returns the
average CPU usage in the period ending at `end_time` with missing data
in the last CPU_API_DELAY_SECONDS.
Args:
duration_minutes: The time duration for which to measure the average CPU
usage.
end_time: The ending timestamp of the workload.
Returns:
The average CPU usage during the time period.
"""
if duration_minutes * 60 <= CPU_API_DELAY_SECONDS:
raise ValueError(
f'Spanner API has a {CPU_API_DELAY_SECONDS} sec. delay in receiving'
' data, choose a longer duration to get CPU usage.'
)
client = monitoring_v3.MetricServiceClient()
# It takes up to 3 minutes for CPU metrics to appear.
cpu_query = query.Query(
client,
project=self.project,
metric_type=(
'spanner.googleapis.com/instance/cpu/utilization_by_priority'
),
end_time=end_time,
minutes=duration_minutes,
)
# Filter by high priority
cpu_query = cpu_query.select_metrics(
database=self.database, priority='high'
)
# Filter by the Spanner instance
cpu_query = cpu_query.select_resources(
instance_id=self.instance_id, project_id=self.project
)
# Aggregate user and system high priority by the minute
time_series = list(cpu_query)
# Expect 2 metrics: user and system high-priority CPU
if len(time_series) != 2:
raise errors.Benchmarks.RunError(
'Expected 2 metrics (user and system) for Spanner high-priority CPU '
f'utilization query, got {len(time_series)}'
)
logging.info('Instance %s utilization by minute:', self.instance_id)
cpu_aggregated = []
for user, system in zip(time_series[0].points, time_series[1].points):
point_utilization = user.value.double_value + system.value.double_value
point_time = datetime.datetime.fromtimestamp(
user.interval.start_time.timestamp()
)
logging.info('%s: %s', point_time, point_utilization)
cpu_aggregated.append(point_utilization)
average_cpu = statistics.mean(cpu_aggregated)
logging.info(
'Instance %s average CPU utilization: %s',
self.instance_id,
average_cpu,
)
return average_cpu
def CalculateTheoreticalMaxThroughput(
self, read_proportion: float, write_proportion: float
) -> int:
"""Returns the theoretical max throughput based on the workload and nodes.
This is the published theoretical max throughput of a Spanner node, see
https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max for
more info.
Args:
read_proportion: the proportion of read requests in the workload.
write_proportion: the propoertion of write requests in the workload.
Returns:
The max total QPS taking into account the published single node limits.
"""
if read_proportion + write_proportion != 1:
raise errors.Benchmarks.RunError(
'Unrecognized workload, read + write proportion must be equal to 1, '
f'got {read_proportion} + {write_proportion}.'
)
read_ops_per_node = _READ_OPS_PER_NODE
write_ops_per_node = _WRITE_OPS_PER_NODE
if self._config in _ADJUSTED_CONFIG_REGIONS:
read_ops_per_node = _ADJUSTED_READ_OPS_PER_NODE
write_ops_per_node = _ADJUSTED_WRITE_OPS_PER_NODE
# Calculates the starting throughput based off of each node being able to
# handle 10k QPS of reads or 2k QPS of writes. For example, for a 50/50
# workload, run at a QPS target of 1666 reads + 1666 writes = 3333 (round).
a = np.array([
[1 / read_ops_per_node, 1 / write_ops_per_node],
[write_proportion, -(1 - write_proportion)],
])
b = np.array([1, 0])
result = np.linalg.solve(a, b)
return int(sum(result) * self.nodes)
class GoogleSqlGcpSpannerInstance(GcpSpannerInstance):
"""GoogleSQL-based Spanner instance."""
ENGINE = sql_engine_utils.SPANNER_GOOGLESQL
def __init__(self, db_spec: SpannerSpec, **kwargs: Any):
super().__init__(db_spec, **kwargs)
self.dialect = GOOGLESQL
def _PostCreate(self):
# Overrides BaseRelationalDB _PostCreate since client utils are
# not yet implemented.
if self.spec.db_flags:
self._ApplyDbFlags()
self._SetEndpoint()
def GetDefaultPort(self) -> int:
return 0 # Port is unused
class PostgresGcpSpannerInstance(GcpSpannerInstance):
"""PostgreSQL-based Spanner instance."""
ENGINE = sql_engine_utils.SPANNER_POSTGRES
def __init__(self, db_spec: SpannerSpec, **kwargs: Any):
super().__init__(db_spec, **kwargs)
self.dialect = POSTGRESQL
self._endpoint = 'localhost'
def GetDefaultPort(self) -> int:
return relational_db.DEFAULT_POSTGRES_PORT
def _GetDbConnectionProperties(
self,
) -> sql_engine_utils.DbConnectionProperties:
return sql_engine_utils.DbConnectionProperties(
self.spec.engine,
self.spec.engine_version,
self.endpoint,
self.port,
self.spec.database_username,
self.spec.database_password,
self.instance_id,
self.database,
self.project,
)