in streaming/store.go [159:227]
func writeJobResources(ctx context.Context, migrationJobId string, dataShardId string, dataflowResources internal.DataflowResources, datastreamResources internal.DatastreamResources, gcsResources internal.GcsResources, pubsubResources internal.PubsubResources, dlqPubsubResources internal.PubsubResources, monitoringResources internal.MonitoringResources, createTimestamp time.Time, client *spanner.Client) error {
datastreamResourcesBytes, err := json.Marshal(datastreamResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal datastream resources for data shard %s: %v\n", dataShardId, err))
return err
}
dataflowResourcesBytes, err := json.Marshal(dataflowResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal dataflow resources for data shard %s: %v\n", dataShardId, err))
return err
}
gcsResourcesBytes, err := json.Marshal(gcsResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal gcs resources for data shard %s: %v\n", dataShardId, err))
return err
}
pubsubResourcesBytes, err := json.Marshal(pubsubResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
return err
}
dlqPubsubResourcesBytes, err := json.Marshal(dlqPubsubResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
return err
}
monitoringResourcesBytes, err := json.Marshal(monitoringResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal monitoring resources for data shard %s: %v\n", dataShardId, err))
return err
}
logger.Log.Debug(fmt.Sprintf("Storing generated resources for data shard %s...\n", dataShardId))
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
datastreamMutation, err := createResourceMutation(migrationJobId, datastreamResources.DatastreamName, constants.DATASTREAM_RESOURCE, datastreamResources.DatastreamName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(datastreamResourcesBytes)})
if err != nil {
return err
}
dataflowMutation, err := createResourceMutation(migrationJobId, dataflowResources.JobId, constants.DATAFLOW_RESOURCE, dataflowResources.JobId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(dataflowResourcesBytes)})
if err != nil {
return err
}
gcsMutation, err := createResourceMutation(migrationJobId, gcsResources.BucketName, constants.GCS_RESOURCE, gcsResources.BucketName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(gcsResourcesBytes)})
if err != nil {
return err
}
pubsubMutation, errr := createResourceMutation(migrationJobId, pubsubResources.TopicId, constants.PUBSUB_RESOURCE, pubsubResources.TopicId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(pubsubResourcesBytes)})
if errr != nil {
return errr
}
dlqPubsubMutation, errr := createResourceMutation(migrationJobId, dlqPubsubResources.TopicId, constants.DLQ_PUBSUB_RESOURCE, dlqPubsubResources.TopicId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(dlqPubsubResourcesBytes)})
if errr != nil {
return errr
}
monitoringMutation, err := createResourceMutation(migrationJobId, monitoringResources.DashboardName, constants.MONITORING_RESOURCE, monitoringResources.DashboardName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(monitoringResourcesBytes)})
if err != nil {
return err
}
err = txn.BufferWrite([]*spanner.Mutation{datastreamMutation, dataflowMutation, gcsMutation, pubsubMutation, dlqPubsubMutation, monitoringMutation})
if err != nil {
return err
}
return nil
})
if err != nil {
logger.Log.Error(fmt.Sprintf("can't store generated resources for data shard %s: %v\n", dataShardId, err))
return err
}
return nil
}