func InitiateJobCleanup()

in streaming/cleanup.go [46:134]


func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds []string, jobCleanupOptions JobCleanupOptions, migrationProjectId string, spannerProjectId string, instance string) {
	//initiate resource cleanup
	if jobCleanupOptions.Dataflow {
		//fetch dataflow resources
		dataflowResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATAFLOW_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch dataflow resources for jobId: %s: %v\n", migrationJobId, err))
		}
		//cleanup
		for _, resources := range dataflowResourcesList {
			var dataflowResources internal.DataflowResources
			var minimalDowntimeResourceData MinimalDowntimeResourceData
			json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
			err = json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &dataflowResources)
			if err != nil {
				logger.Log.Debug("Unable to read Dataflow metadata for deletion\n")
			} else {
				cleanupDataflowJob(ctx, dataflowResources, migrationProjectId)
			}
		}
	}
	if jobCleanupOptions.Datastream {
		//fetch dataflow resources
		datastreamResourcesList, err := FetchResources(ctx, migrationJobId, constants.DATASTREAM_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch datastream resources for jobId: %s: %v\n", migrationJobId, err))
		}
		//cleanup
		for _, resources := range datastreamResourcesList {
			var datastreamResources internal.DatastreamResources
			var minimalDowntimeResourceData MinimalDowntimeResourceData
			json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
			err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &datastreamResources)
			if err != nil {
				logger.Log.Debug("Unable to read Datastream metadata for deletion\n")
			} else {
				cleanupDatastream(ctx, datastreamResources, migrationProjectId)
			}
		}
	}
	if jobCleanupOptions.Pubsub {
		//fetch pubsub resources
		pubsubResourcesList, err := FetchResources(ctx, migrationJobId, constants.PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
		}
		dlqPubSubList, err := FetchResources(ctx, migrationJobId, constants.DLQ_PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
		}
		pubsubResourcesList = append(pubsubResourcesList, dlqPubSubList...)
		//cleanup
		for _, resources := range pubsubResourcesList {
			var pubsubResources internal.PubsubResources
			var minimalDowntimeResourceData MinimalDowntimeResourceData
			json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
			err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &pubsubResources)
			if err != nil {
				logger.Log.Debug("Unable to read Pubsub metadata for deletion\n")
			} else {
				cleanupPubsubResources(ctx, pubsubResources, migrationProjectId)
			}
		}
	}
	if jobCleanupOptions.Monitoring {
		//fetch monitoring resources
		shardMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch shard monitoring resources for jobId: %s: %v\n", migrationJobId, err))
		}
		jobMonitoringResourcesList, err := FetchResources(ctx, migrationJobId, constants.AGG_MONITORING_RESOURCE, dataShardIds, spannerProjectId, instance)
		if err != nil {
			logger.Log.Debug(fmt.Sprintf("Unable to fetch aggregate monitoring resources for jobId: %s: %v\n", migrationJobId, err))
		}
		monitoringResourcesList := append(shardMonitoringResourcesList, jobMonitoringResourcesList...)
		//cleanup
		for _, resources := range monitoringResourcesList {
			var monitoringResources internal.MonitoringResources
			var minimalDowntimeResourceData MinimalDowntimeResourceData
			json.Unmarshal([]byte(resources.ResourceData), &minimalDowntimeResourceData)
			err := json.Unmarshal([]byte(minimalDowntimeResourceData.ResourcePayload), &monitoringResources)
			if err != nil {
				logger.Log.Debug("Unable to read monitoring metadata for deletion\n")
			} else {
				cleanupMonitoringDashboard(ctx, monitoringResources, migrationProjectId)
			}
		}
	}
}