in vermeer/apps/graphio/local_file.go [50:139]
func (a *LocalMaker) MakeTasks(params map[string]string, taskID int32) ([]LoadPartition, error) {
partID := int32(1)
loadParts := make([]LoadPartition, 0)
fileMap := options.GetMapString(params, "load.vertex_files")
for ip, files := range fileMap {
bIdx := strings.LastIndex(files, "[")
eIdx := strings.LastIndex(files, "]")
if bIdx < 0 || eIdx < 0 {
part := LoadPartition{}
part.Init(partID, taskID, LoadPartTypeVertex)
part.IpAddr = ip
part.Params = make(map[string]string)
part.Params["load.file_path"] = files
loadParts = append(loadParts, part)
partID += 1
continue
}
ss := strings.Split(files[bIdx+1:eIdx], ",")
if len(ss) != 2 {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
logrus.Errorf(s)
return nil, errors.New(s)
}
s, err := strconv.Atoi(ss[0])
if err != nil {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s, %s", files, err)
logrus.Errorf(s)
return nil, errors.New(s)
}
e, err := strconv.Atoi(ss[1])
if err != nil {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s, %s", files, err)
logrus.Errorf(s)
return nil, errors.New(s)
}
for i := s; i <= e; i++ {
part := LoadPartition{}
part.Init(partID, taskID, LoadPartTypeVertex)
part.IpAddr = ip
part.Params = make(map[string]string)
part.Params["load.file_path"] = files[:bIdx] + common.ItoaPad(i, len(ss[1]))
loadParts = append(loadParts, part)
partID += 1
}
}
fileMap = options.GetMapString(params, "load.edge_files")
for ip, files := range fileMap {
bIdx := strings.LastIndex(files, "[")
eIdx := strings.LastIndex(files, "]")
if bIdx < 0 || eIdx < 0 {
part := LoadPartition{}
part.Init(partID, taskID, LoadPartTypeEdge)
part.IpAddr = ip
part.Params = make(map[string]string)
part.Params["load.file_path"] = files
loadParts = append(loadParts, part)
partID += 1
continue
}
ss := strings.Split(files[bIdx+1:eIdx], ",")
if len(ss) != 2 {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
logrus.Errorf(s)
return nil, errors.New(s)
}
s, err := strconv.Atoi(ss[0])
if err != nil {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
logrus.Errorf(s)
return nil, errors.New(s)
}
e, err := strconv.Atoi(ss[1])
if err != nil {
s := fmt.Sprintf("MakeTask LoadTypeLocal parse file error: %s", files)
logrus.Errorf(s)
return nil, errors.New(s)
}
for i := s; i <= e; i++ {
part := LoadPartition{}
part.Init(partID, taskID, LoadPartTypeEdge)
part.IpAddr = ip
part.Params = make(map[string]string)
part.Params["load.file_path"] = files[:bIdx] + common.ItoaPad(i, len(ss[1]))
loadParts = append(loadParts, part)
partID += 1
}
}
return loadParts, nil
}