in batch/create_with_persistent_disk.go [29:131]
func createJobWithPD(w io.Writer, projectID, jobName, pdName string) error {
// jobName := job-name
// pdName := disk-name
ctx := context.Background()
batchClient, err := batch.NewClient(ctx)
if err != nil {
return fmt.Errorf("batchClient error: %w", err)
}
defer batchClient.Close()
runn := &batchpb.Runnable{
Executable: &batchpb.Runnable_Script_{
Script: &batchpb.Runnable_Script{
Command: &batchpb.Runnable_Script_Text{
Text: "echo Hello world from script 1 for task ${BATCH_TASK_INDEX}",
},
},
},
}
volume := &batchpb.Volume{
MountPath: fmt.Sprintf("/mnt/disks/%v", pdName),
Source: &batchpb.Volume_DeviceName{
DeviceName: pdName,
},
}
// The disk type of the new persistent disk, either pd-standard,
// pd-balanced, pd-ssd, or pd-extreme. For Batch jobs, the default is pd-balanced
disk := &batchpb.AllocationPolicy_Disk{
Type: "pd-balanced",
SizeGb: 10,
}
taskSpec := &batchpb.TaskSpec{
ComputeResource: &batchpb.ComputeResource{
// CpuMilli is milliseconds per cpu-second. This means the task requires 1 CPU.
CpuMilli: 1000,
MemoryMib: 16,
},
MaxRunDuration: &durationpb.Duration{
Seconds: 3600,
},
MaxRetryCount: 2,
Runnables: []*batchpb.Runnable{runn},
Volumes: []*batchpb.Volume{volume},
}
taskGroups := []*batchpb.TaskGroup{
{
TaskCount: 4,
TaskSpec: taskSpec,
},
}
labels := map[string]string{"env": "testing", "type": "container"}
// Policies are used to define on what kind of virtual machines the tasks will run on.
// Read more about local disks here: https://cloud.google.com/compute/docs/disks/persistent-disks
allocationPolicy := &batchpb.AllocationPolicy{
Instances: []*batchpb.AllocationPolicy_InstancePolicyOrTemplate{{
PolicyTemplate: &batchpb.AllocationPolicy_InstancePolicyOrTemplate_Policy{
Policy: &batchpb.AllocationPolicy_InstancePolicy{
MachineType: "n1-standard-1",
Disks: []*batchpb.AllocationPolicy_AttachedDisk{
{
Attached: &batchpb.AllocationPolicy_AttachedDisk_NewDisk{
NewDisk: disk,
},
DeviceName: pdName,
},
},
},
},
}},
}
// We use Cloud Logging as it's an out of the box available option
logsPolicy := &batchpb.LogsPolicy{
Destination: batchpb.LogsPolicy_CLOUD_LOGGING,
}
job := &batchpb.Job{
Name: jobName,
TaskGroups: taskGroups,
AllocationPolicy: allocationPolicy,
Labels: labels,
LogsPolicy: logsPolicy,
}
request := &batchpb.CreateJobRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", projectID, "us-central1"),
JobId: jobName,
Job: job,
}
created_job, err := batchClient.CreateJob(ctx, request)
if err != nil {
return fmt.Errorf("unable to create job: %w", err)
}
fmt.Fprintf(w, "Job created: %v\n", created_job)
return nil
}