in state/lock.go [31:80]
func getTableTask(workerID string, lockTimeout time.Duration) (*Row, error) {
tx, err := mgr.conn.Begin()
if log.E(err) {
return nil, err
}
defer func() { _ = tx.Rollback() }()
var id int64
var add string
if CheckMySQLVersion("8") {
add = " SKIP LOCKED"
}
ir := util.QueryTxRowSQL(tx, `SELECT id FROM state WHERE deleted_at IS NULL AND (locked_at IS NULL OR locked_at < NOW() - INTERVAL ? SECOND) AND (input = 'mysql' OR snapshotted_at IS NULL OR need_snapshot OR (params->"$.Schedule.Interval" > 0 AND snapshotted_at + INTERVAL params->"$.Schedule.Interval" SECOND < NOW())) LIMIT 1 FOR UPDATE`+add, lockTimeout/time.Second)
err = ir.Scan(&id)
if err != nil && err == sql.ErrNoRows {
return nil, nil // no tasks
}
if log.E(err) {
return nil, err
}
r, err := util.QueryTxSQL(tx, stateSQL+` WHERE state.id=?`, id)
if log.E(err) {
return nil, err
}
srow, err := parseRows(r)
if log.E(err) {
return nil, err
}
if len(srow) == 0 {
return nil, nil // no tasks
}
row := &srow[0]
tm := time.Now().Round(time.Second)
if row.TimeForSnapshot(tm) {
err = advanceSnapshottedAt(tx, row.ID, tm)
if log.E(err) {
return nil, err
}
row.NeedSnapshot = true
row.SnapshottedAt = tm
}
err = util.ExecTxSQL(tx, `UPDATE state SET locked_at=NOW(), worker_id=? WHERE id=?`, workerID, row.ID)
if log.E(err) {
return nil, err
}
err = tx.Commit()
if log.E(err) {
return nil, err
}
return row, nil
}