service/matching/taskWriter.go (221 lines of code) (raw):

// Copyright (c) 2020 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package matching import ( "context" "errors" "fmt" "sync/atomic" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) type ( writeTaskResponse struct { err error persistenceResponse *persistence.CreateTasksResponse } writeTaskRequest struct { execution *types.WorkflowExecution taskInfo *persistence.TaskInfo responseCh chan<- *writeTaskResponse } taskIDBlock struct { start int64 end int64 } // taskWriter writes tasks sequentially to persistence taskWriter struct { db *taskListDB config *taskListConfig taskListID *taskListID taskAckManager messaging.AckManager appendCh chan *writeTaskRequest taskIDBlock taskIDBlock maxReadLevel int64 stopped int64 // set to 1 if the writer is stopped or is shutting down logger log.Logger scope metrics.Scope stopCh chan struct{} // shutdown signal for all routines in this class throttleRetry *backoff.ThrottleRetry handleErr func(error) error onFatalErr func() } ) // errShutdown indicates that the task list is shutting down var errShutdown = errors.New("task list shutting down") func newTaskWriter(tlMgr *taskListManagerImpl) *taskWriter { return &taskWriter{ db: tlMgr.db, config: tlMgr.config, taskListID: tlMgr.taskListID, taskAckManager: tlMgr.taskAckManager, stopCh: make(chan struct{}), appendCh: make(chan *writeTaskRequest, tlMgr.config.OutstandingTaskAppendsThreshold()), logger: tlMgr.logger, scope: tlMgr.scope, handleErr: tlMgr.handleErr, onFatalErr: tlMgr.Stop, throttleRetry: backoff.NewThrottleRetry( backoff.WithRetryPolicy(persistenceOperationRetryPolicy), backoff.WithRetryableError(persistence.IsTransientError), ), } } func (w *taskWriter) Start() error { // Make sure to grab the range first before starting task writer, as it needs the range to initialize maxReadLevel state, err := w.renewLeaseWithRetry() if err != nil { return err } w.taskAckManager.SetAckLevel(state.ackLevel) block := rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize) w.taskIDBlock = block w.maxReadLevel = block.start - 1 go w.taskWriterLoop() return nil } // Stop stops the taskWriter func (w *taskWriter) Stop() { if atomic.CompareAndSwapInt64(&w.stopped, 0, 1) { close(w.stopCh) } } func (w *taskWriter) isStopped() bool { return atomic.LoadInt64(&w.stopped) == 1 } func (w *taskWriter) appendTask(execution *types.WorkflowExecution, taskInfo *persistence.TaskInfo) (*persistence.CreateTasksResponse, error) { if w.isStopped() { return nil, errShutdown } ch := make(chan *writeTaskResponse) req := &writeTaskRequest{ execution: execution, taskInfo: taskInfo, responseCh: ch, } select { case w.appendCh <- req: select { case r := <-ch: return r.persistenceResponse, r.err case <-w.stopCh: // if we are shutting down, this request will never make // it to cassandra, just bail out and fail this request return nil, errShutdown } default: // channel is full, throttle return nil, createServiceBusyError("Too many outstanding appends to the TaskList") } } func (w *taskWriter) GetMaxReadLevel() int64 { return atomic.LoadInt64(&w.maxReadLevel) } func (w *taskWriter) allocTaskIDs(count int) ([]int64, error) { result := make([]int64, count) for i := 0; i < count; i++ { if w.taskIDBlock.start > w.taskIDBlock.end { // we ran out of current allocation block newBlock, err := w.allocTaskIDBlock(w.taskIDBlock.end) if err != nil { return nil, err } w.taskIDBlock = newBlock } result[i] = w.taskIDBlock.start w.taskIDBlock.start++ } return result, nil } func (w *taskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error) { currBlock := rangeIDToTaskIDBlock(w.db.RangeID(), w.config.RangeSize) if currBlock.end != prevBlockEnd { return taskIDBlock{}, fmt.Errorf("allocTaskIDBlock: invalid state: prevBlockEnd:%v != currTaskIDBlock:%+v", prevBlockEnd, currBlock) } state, err := w.renewLeaseWithRetry() if err != nil { return taskIDBlock{}, err } return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil } func (w *taskWriter) renewLeaseWithRetry() (taskListState, error) { var newState taskListState op := func() (err error) { newState, err = w.db.RenewLease() return } w.scope.IncCounter(metrics.LeaseRequestPerTaskListCounter) err := w.throttleRetry.Do(context.Background(), op) if err != nil { w.scope.IncCounter(metrics.LeaseFailurePerTaskListCounter) w.onFatalErr() return newState, err } return newState, nil } func (w *taskWriter) taskWriterLoop() { writerLoop: for { select { case request := <-w.appendCh: { // read a batch of requests from the channel reqs := []*writeTaskRequest{request} reqs = w.getWriteBatch(reqs) batchSize := len(reqs) maxReadLevel := int64(0) taskIDs, err := w.allocTaskIDs(batchSize) if err != nil { w.logger.Error("error allocating task ids", tag.Error(err), ) w.sendWriteResponse(reqs, err, nil) continue writerLoop } tasks := []*persistence.CreateTaskInfo{} for i, req := range reqs { tasks = append(tasks, &persistence.CreateTaskInfo{ TaskID: taskIDs[i], Execution: *req.execution, Data: req.taskInfo, }) maxReadLevel = taskIDs[i] } r, err := w.db.CreateTasks(tasks) err = w.handleErr(err) if err != nil { w.logger.Error("Persistent store operation failure", tag.StoreOperationCreateTasks, tag.Error(err), tag.Number(taskIDs[0]), tag.NextNumber(taskIDs[batchSize-1]), ) } // Update the maxReadLevel after the writes are completed. if maxReadLevel > 0 { atomic.StoreInt64(&w.maxReadLevel, maxReadLevel) } w.sendWriteResponse(reqs, err, r) } case <-w.stopCh: // we don't close the appendCh here // because that can cause on a send on closed // channel panic on the appendTask() break writerLoop } } } func (w *taskWriter) getWriteBatch(reqs []*writeTaskRequest) []*writeTaskRequest { readLoop: for i := 0; i < w.config.MaxTaskBatchSize(); i++ { select { case req := <-w.appendCh: reqs = append(reqs, req) default: // channel is empty, don't block break readLoop } } return reqs } func (w *taskWriter) sendWriteResponse(reqs []*writeTaskRequest, err error, persistenceResponse *persistence.CreateTasksResponse) { for _, req := range reqs { resp := &writeTaskResponse{ err: err, persistenceResponse: persistenceResponse, } req.responseCh <- resp } }