example/mapreduce/order_info.go (81 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "encoding/json" "fmt" "strconv" "time" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" ) var _ processor.MapReduceJobProcessor = &TestMapReduceJob{} type OrderInfo struct { Id string `json:"id"` Value int `json:"value"` } func NewOrderInfo(id string, value int) *OrderInfo { return &OrderInfo{Id: id, Value: value} } type TestMapReduceJob struct { *mapjob.MapReduceJobProcessor } func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 1000 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("start root task") var orderInfos []interface{} for i := 1; i <= num; i++ { orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i)) } return mr.Map(jobCtx, orderInfos, "OrderInfo") } else if taskName == "OrderInfo" { orderInfo := new(OrderInfo) if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil { fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("orderInfo=%+v\n", orderInfo) time.Sleep(10 * time.Millisecond) // fmt.Println("Finish Process...") return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil } func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { allTaskResults := jobCtx.TaskResults() allTaskStatuses := jobCtx.TaskStatuses() count := 0 fmt.Printf("reduce: all task count=%d\n", len(allTaskResults)) for key, val := range allTaskResults { if key == 0 { continue } if allTaskStatuses[key] == taskstatus.TaskStatusSucceed { num, err := strconv.Atoi(val) if err != nil { return nil, err } count += num } } fmt.Printf("reduce: succeed task count=%d\n", count) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(count)), ), nil }