func writeJobResources()

in streaming/store.go [159:227]


func writeJobResources(ctx context.Context, migrationJobId string, dataShardId string, dataflowResources internal.DataflowResources, datastreamResources internal.DatastreamResources, gcsResources internal.GcsResources, pubsubResources internal.PubsubResources, dlqPubsubResources internal.PubsubResources, monitoringResources internal.MonitoringResources, createTimestamp time.Time, client *spanner.Client) error {
	datastreamResourcesBytes, err := json.Marshal(datastreamResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal datastream resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	dataflowResourcesBytes, err := json.Marshal(dataflowResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal dataflow resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	gcsResourcesBytes, err := json.Marshal(gcsResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal gcs resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	pubsubResourcesBytes, err := json.Marshal(pubsubResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	dlqPubsubResourcesBytes, err := json.Marshal(dlqPubsubResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	monitoringResourcesBytes, err := json.Marshal(monitoringResources)
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't marshal monitoring resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	logger.Log.Debug(fmt.Sprintf("Storing generated resources for data shard %s...\n", dataShardId))
	_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		datastreamMutation, err := createResourceMutation(migrationJobId, datastreamResources.DatastreamName, constants.DATASTREAM_RESOURCE, datastreamResources.DatastreamName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(datastreamResourcesBytes)})
		if err != nil {
			return err
		}
		dataflowMutation, err := createResourceMutation(migrationJobId, dataflowResources.JobId, constants.DATAFLOW_RESOURCE, dataflowResources.JobId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(dataflowResourcesBytes)})
		if err != nil {
			return err
		}
		gcsMutation, err := createResourceMutation(migrationJobId, gcsResources.BucketName, constants.GCS_RESOURCE, gcsResources.BucketName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(gcsResourcesBytes)})
		if err != nil {
			return err
		}
		pubsubMutation, errr := createResourceMutation(migrationJobId, pubsubResources.TopicId, constants.PUBSUB_RESOURCE, pubsubResources.TopicId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(pubsubResourcesBytes)})
		if errr != nil {
			return errr
		}
		dlqPubsubMutation, errr := createResourceMutation(migrationJobId, dlqPubsubResources.TopicId, constants.DLQ_PUBSUB_RESOURCE, dlqPubsubResources.TopicId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(dlqPubsubResourcesBytes)})
		if errr != nil {
			return errr
		}
		monitoringMutation, err := createResourceMutation(migrationJobId, monitoringResources.DashboardName, constants.MONITORING_RESOURCE, monitoringResources.DashboardName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(monitoringResourcesBytes)})
		if err != nil {
			return err
		}
		err = txn.BufferWrite([]*spanner.Mutation{datastreamMutation, dataflowMutation, gcsMutation, pubsubMutation, dlqPubsubMutation, monitoringMutation})
		if err != nil {
			return err
		}
		return nil
	})
	if err != nil {
		logger.Log.Error(fmt.Sprintf("can't store generated resources for data shard %s: %v\n", dataShardId, err))
		return err
	}
	return nil
}