func()

in vermeer/apps/graphio/local_file.go [50:139]


func (a *LocalMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) {
	partID := int32(1)
	loadParts := make([]LoadPartition, 0)
	fileMap := options.GetMapString(params, "load.vertex_files")
	for ip, files := range fileMap {
		bIdx := strings.LastIndex(files, "[")
		eIdx := strings.LastIndex(files, "]")
		if bIdx < 0 || eIdx < 0 {
			part := LoadPartition{}
			part.Init(partID, taskID, LoadPartTypeVertex)
			part.IpAddr = ip
			part.Params = make(map[string]string)
			part.Params["load.file_path"] = files
			loadParts = append(loadParts, part)
			partID += 1
			continue
		}
		ss := strings.Split(files[bIdx+1:eIdx], ",")
		if len(ss) != 2 {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		s, err := strconv.Atoi(ss[0])
		if err != nil {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s, %s", files, err)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		e, err := strconv.Atoi(ss[1])
		if err != nil {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s, %s", files, err)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		for i := s; i <= e; i++ {
			part := LoadPartition{}
			part.Init(partID, taskID, LoadPartTypeVertex)
			part.IpAddr = ip
			part.Params = make(map[string]string)
			part.Params["load.file_path"] = files[:bIdx] + common.ItoaPad(i, len(ss[1]))
			loadParts = append(loadParts, part)
			partID += 1
		}
	}

	fileMap = options.GetMapString(params, "load.edge_files")
	for ip, files := range fileMap {
		bIdx := strings.LastIndex(files, "[")
		eIdx := strings.LastIndex(files, "]")
		if bIdx < 0 || eIdx < 0 {
			part := LoadPartition{}
			part.Init(partID, taskID, LoadPartTypeEdge)
			part.IpAddr = ip
			part.Params = make(map[string]string)
			part.Params["load.file_path"] = files
			loadParts = append(loadParts, part)
			partID += 1
			continue
		}
		ss := strings.Split(files[bIdx+1:eIdx], ",")
		if len(ss) != 2 {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		s, err := strconv.Atoi(ss[0])
		if err != nil {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		e, err := strconv.Atoi(ss[1])
		if err != nil {
			s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
			logrus.Errorf(s)
			return nil, errors.New(s)
		}
		for i := s; i <= e; i++ {
			part := LoadPartition{}
			part.Init(partID, taskID, LoadPartTypeEdge)
			part.IpAddr = ip
			part.Params = make(map[string]string)
			part.Params["load.file_path"] = files[:bIdx] + common.ItoaPad(i, len(ss[1]))
			loadParts = append(loadParts, part)
			partID += 1
		}
	}
	return loadParts, nil
}