in streaming/cleanup.go [46:134]
func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds []string, jobCleanupOptions JobCleanupOptions, migrationProjectId string, spannerProjectId string, instance string) {
//initiate resource cleanup
if jobCleanupOptions.Dataflow {
//fetch dataflow resources
dataflowResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATAFLOW_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch dataflow resources for jobId: %s: %v\n", migrationJobId, err))
}
//cleanup
for _, resources := range dataflowResourcesList {
var dataflowResources internal.DataflowResources
var minimalDowntimeResourceData MinimalDowntimeResourceData
json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
err = json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &dataflowResources)
if err != nil {
logger.Log.Debug("Unable to read Dataflow metadata for deletion\n")
} else {
cleanupDataflowJob(ctx, dataflowResources, migrationProjectId)
}
}
}
if jobCleanupOptions.Datastream {
//fetch dataflow resources
datastreamResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATASTREAM_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch datastream resources for jobId: %s: %v\n", migrationJobId, err))
}
//cleanup
for _, resources := range datastreamResourcesList {
var datastreamResources internal.DatastreamResources
var minimalDowntimeResourceData MinimalDowntimeResourceData
json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &datastreamResources)
if err != nil {
logger.Log.Debug("Unable to read Datastream metadata for deletion\n")
} else {
cleanupDatastream(ctx, datastreamResources, migrationProjectId)
}
}
}
if jobCleanupOptions.Pubsub {
//fetch pubsub resources
pubsubResourcesList, err := FetchResources(ctx, migrationJobId, constants.PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
}
dlqPubSubList, err := FetchResources(ctx, migrationJobId, constants.DLQ_PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
}
pubsubResourcesList = append(pubsubResourcesList, dlqPubSubList...)
//cleanup
for _, resources := range pubsubResourcesList {
var pubsubResources internal.PubsubResources
var minimalDowntimeResourceData MinimalDowntimeResourceData
json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &pubsubResources)
if err != nil {
logger.Log.Debug("Unable to read Pubsub metadata for deletion\n")
} else {
cleanupPubsubResources(ctx, pubsubResources, migrationProjectId)
}
}
}
if jobCleanupOptions.Monitoring {
//fetch monitoring resources
shardMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch shard monitoring resources for jobId: %s: %v\n", migrationJobId, err))
}
jobMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.AGG_MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch aggregate monitoring resources for jobId: %s: %v\n", migrationJobId, err))
}
monitoringResourcesList := append(shardMonitoringResourcesList, jobMonitoringResourcesList...)
//cleanup
for _, resources := range monitoringResourcesList {
var monitoringResources internal.MonitoringResources
var minimalDowntimeResourceData MinimalDowntimeResourceData
json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &monitoringResources)
if err != nil {
logger.Log.Debug("Unable to read monitoring metadata for deletion\n")
} else {
cleanupMonitoringDashboard(ctx, monitoringResources, migrationProjectId)
}
}
}
}