backend/bms_app/services/dms.py (117 lines of code) (raw):

import uuid from google.cloud import clouddms_v1 from google.api_core import operation class DMS: def __init__( self, project_id: str, region: str, dms_client: clouddms_v1.DataMigrationServiceClient = clouddms_v1.DataMigrationServiceClient(), ): self.project_id = project_id self.region = region self.client = dms_client def _get_parent(self) -> str: return f'projects/{self.project_id}/locations/{self.region}' def _get_migration_job_name(self, name: str) -> str: return self.client.migration_job_path(self.project_id, self.region, name) def _get_connection_profile_name(self, name: str) -> str: return self.client.connection_profile_path(self.project_id, self.region, name) def _get_new_request_id(self) -> str: return str(uuid.uuid4()) def get_migration_job(self, name: str) -> clouddms_v1.MigrationJob: req = clouddms_v1.GetMigrationJobRequest(name=self._get_migration_job_name(name)) return self.client.get_migration_job(request=req) def create_migration_job( self, name: str, display_name: str, source_conn: str, destination_conn: str, ) -> operation.Operation: req = clouddms_v1.CreateMigrationJobRequest( parent=self._get_parent(), migration_job_id=name, migration_job=clouddms_v1.MigrationJob( name=self._get_migration_job_name(name), labels={}, display_name=display_name, state=clouddms_v1.MigrationJob.State.DRAFT, type_=clouddms_v1.MigrationJob.Type.CONTINUOUS, source=self._get_connection_profile_name(source_conn), destination=self._get_connection_profile_name(destination_conn), static_ip_connectivity={}, source_database=clouddms_v1.DatabaseType( provider=clouddms_v1.DatabaseProvider.DATABASE_PROVIDER_UNSPECIFIED, engine=clouddms_v1.DatabaseEngine.POSTGRESQL ), destination_database=clouddms_v1.DatabaseType( provider=clouddms_v1.DatabaseProvider.CLOUDSQL, engine=clouddms_v1.DatabaseEngine.POSTGRESQL ), ), request_id=self._get_new_request_id() ) return self.client.create_migration_job(request=req) def verify_migration_job(self, name: str) -> operation.Operation: req = clouddms_v1.VerifyMigrationJobRequest(name=self._get_migration_job_name(name)) return self.client.verify_migration_job(request=req) def start_migration_job(self, name: str) -> operation.Operation: req = clouddms_v1.StartMigrationJobRequest(name=self._get_migration_job_name(name)) return self.client.start_migration_job(request=req) def get_connection_profile(self, name: str) -> clouddms_v1.ConnectionProfile: req = clouddms_v1.GetConnectionProfileRequest(name=self._get_connection_profile_name(name)) return self.client.get_connection_profile(request=req) def create_source_connection_profile( self, name: str, host: str, port: int, username: str, password: str ) -> operation.Operation: req = clouddms_v1.CreateConnectionProfileRequest( parent=self._get_parent(), connection_profile_id=name, connection_profile=clouddms_v1.ConnectionProfile( name=self._get_connection_profile_name(name), display_name=f'{host} (Waverunner)', postgresql=clouddms_v1.PostgreSqlConnectionProfile( host=host, port=port, username=username, password=password ) ), request_id=self._get_new_request_id(), ) return self.client.create_connection_profile(request=req) def create_destination_connection_profile( self, name: str, source_conn_name: str, ) -> operation.Operation: req = clouddms_v1.CreateConnectionProfileRequest( parent=self._get_parent(), connection_profile_id=name, connection_profile=clouddms_v1.ConnectionProfile( name=self._get_connection_profile_name(name), labels={}, display_name='Waverunner destination', cloudsql=clouddms_v1.CloudSqlConnectionProfile( settings=clouddms_v1.CloudSqlSettings( database_version=clouddms_v1.CloudSqlSettings.SqlDatabaseVersion.POSTGRES_12, tier='db-custom-1-3840', ip_config=clouddms_v1.SqlIpConfig( enable_ipv4=True ), data_disk_type=clouddms_v1.CloudSqlSettings.SqlDataDiskType.PD_SSD, data_disk_size_gb=20, zone='us-central1-c', source_id=self._get_connection_profile_name(source_conn_name), root_password='waverunner-test' # TODO: use secret manager ), ) ), request_id=self._get_new_request_id() ) return self.client.create_connection_profile(request=req)