func getTableTask()

in state/lock.go [31:80]


func getTableTask(workerID string, lockTimeout time.Duration) (*Row, error) {
	tx, err := mgr.conn.Begin()
	if log.E(err) {
		return nil, err
	}
	defer func() { _ = tx.Rollback() }()
	var id int64
	var add string
	if CheckMySQLVersion("8") {
		add = " SKIP LOCKED"
	}
	ir := util.QueryTxRowSQL(tx, `SELECT id FROM state WHERE deleted_at IS NULL AND (locked_at IS NULL OR locked_at < NOW() - INTERVAL ? SECOND) AND (input = 'mysql' OR snapshotted_at IS NULL OR need_snapshot OR (params->"$.Schedule.Interval" > 0 AND snapshotted_at + INTERVAL params->"$.Schedule.Interval" SECOND < NOW())) LIMIT 1 FOR UPDATE`+add, lockTimeout/time.Second)
	err = ir.Scan(&id)
	if err != nil && err == sql.ErrNoRows {
		return nil, nil // no tasks
	}
	if log.E(err) {
		return nil, err
	}
	r, err := util.QueryTxSQL(tx, stateSQL+` WHERE state.id=?`, id)
	if log.E(err) {
		return nil, err
	}
	srow, err := parseRows(r)
	if log.E(err) {
		return nil, err
	}
	if len(srow) == 0 {
		return nil, nil // no tasks
	}
	row := &srow[0]
	tm := time.Now().Round(time.Second)
	if row.TimeForSnapshot(tm) {
		err = advanceSnapshottedAt(tx, row.ID, tm)
		if log.E(err) {
			return nil, err
		}
		row.NeedSnapshot = true
		row.SnapshottedAt = tm
	}
	err = util.ExecTxSQL(tx, `UPDATE state SET locked_at=NOW(), worker_id=? WHERE id=?`, workerID, row.ID)
	if log.E(err) {
		return nil, err
	}
	err = tx.Commit()
	if log.E(err) {
		return nil, err
	}
	return row, nil
}