func collectResources()

in pkg/jobmgr/util/handler/thermos.go [229:315]


func collectResources(podSpec *pod.PodSpec) (*pod.ResourceSpec, []*pod.PortSpec) {
	// Collect maximum resource and ports allocated (key'ed by port name)
	// by initial containers.
	maxInitRes := &pod.ResourceSpec{}
	initPorts := make(map[string]*pod.PortSpec)
	for _, initContainer := range podSpec.GetInitContainers() {
		res := initContainer.GetResource()
		if res.GetCpuLimit() > maxInitRes.GetCpuLimit() {
			maxInitRes.CpuLimit = res.CpuLimit
		}
		if res.GetMemLimitMb() > maxInitRes.GetMemLimitMb() {
			maxInitRes.MemLimitMb = res.MemLimitMb
		}
		if res.GetDiskLimitMb() > maxInitRes.GetDiskLimitMb() {
			maxInitRes.DiskLimitMb = res.DiskLimitMb
		}
		if res.GetFdLimit() > maxInitRes.GetFdLimit() {
			maxInitRes.FdLimit = res.FdLimit
		}
		if res.GetGpuLimit() > maxInitRes.GetGpuLimit() {
			maxInitRes.GpuLimit = res.GpuLimit
		}

		for _, port := range initContainer.GetPorts() {
			if _, ok := initPorts[port.GetName()]; !ok {
				initPorts[port.GetName()] = port
			}
		}
	}

	// Collect sum of resources and ports allocated (key'ed by port name)
	// by containers
	sumRes := &pod.ResourceSpec{}
	ports := make(map[string]*pod.PortSpec)
	for _, container := range podSpec.GetContainers() {
		res := container.GetResource()
		sumRes.CpuLimit = sumRes.GetCpuLimit() + res.GetCpuLimit()
		sumRes.MemLimitMb = sumRes.GetMemLimitMb() + res.GetMemLimitMb()
		sumRes.DiskLimitMb = sumRes.GetDiskLimitMb() + res.GetDiskLimitMb()
		sumRes.FdLimit = sumRes.GetFdLimit() + res.GetFdLimit()
		sumRes.GpuLimit = sumRes.GetGpuLimit() + res.GetGpuLimit()

		for _, port := range container.GetPorts() {
			if _, ok := ports[port.GetName()]; !ok {
				ports[port.GetName()] = port
			}
		}
	}

	// Returned resource would be max of (maxInitRes, sumRes)
	// Returned ports would be merged list of (initPorts, ports)
	resultRes := &pod.ResourceSpec{
		CpuLimit:    math.Max(maxInitRes.GetCpuLimit(), sumRes.GetCpuLimit()),
		MemLimitMb:  math.Max(maxInitRes.GetMemLimitMb(), sumRes.GetMemLimitMb()),
		DiskLimitMb: math.Max(maxInitRes.GetDiskLimitMb(), sumRes.GetDiskLimitMb()),
		GpuLimit:    math.Max(maxInitRes.GetGpuLimit(), sumRes.GetGpuLimit()),
		// Using a function here since math.Max only supports float64
		FdLimit: func() uint32 {
			if maxInitRes.GetFdLimit() > sumRes.GetFdLimit() {
				return maxInitRes.GetFdLimit()
			}
			return sumRes.GetFdLimit()
		}(),
	}

	portsMap := make(map[string]*pod.PortSpec)
	for n, p := range initPorts {
		if _, ok := portsMap[n]; !ok {
			portsMap[n] = p
		}
	}
	for n, p := range ports {
		if _, ok := portsMap[n]; !ok {
			portsMap[n] = p
		}
	}
	resultPorts := make([]*pod.PortSpec, 0, len(portsMap))
	for _, p := range portsMap {
		resultPorts = append(resultPorts, p)
	}

	// Make sure the order of the ports are consistent to avoid
	// unnecessary job restarts.
	sort.Stable(portSpecByName(resultPorts))

	return resultRes, resultPorts
}