pkg/datasource/sql/util/sql.go (182 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Copyright 2011 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package sql provides a generic interface around SQL (or SQL-like) // databases. // // The sql package must be used in conjunction with a database driver. // See https://golang.org/s/sqldrivers for a list of drivers. // // Drivers that do not support context cancellation will not return until // after the query is completed. // // For usage examples, see the wiki page at // https://golang.org/s/sqlwiki. package util import ( "context" "database/sql/driver" "errors" "fmt" "io" "reflect" "sync" ) // ScanRows is the result of a query. Its cursor starts before the first row // of the result set. Use Next to advance from row to row. type ScanRows struct { //dc *driverConn // owned; must call releaseConn when closed to release dc driver.Conn releaseConn func(error) rowsi driver.Rows cancel func() // called when ScanRows is closed, may be nil. //closeStmt *driverStmt // if non-nil, statement to Close on close // closemu prevents ScanRows from closing while there // is an active streaming result. It is held for read during non-close operations // and exclusively during close. // // closemu guards lasterr and closed. closemu sync.RWMutex closed bool lasterr error // non-nil only if closed is true // lastcols is only used in Scan, Next, and NextResultSet which are expected // not to be called concurrently. lastcols []driver.Value } func NewScanRows(rowsi driver.Rows) *ScanRows { return &ScanRows{rowsi: rowsi} } // lasterrOrErrLocked returns either lasterr or the provided err. // rs.closemu must be read-locked. func (rs *ScanRows) lasterrOrErrLocked(err error) error { if rs.lasterr != nil && rs.lasterr != io.EOF { return rs.lasterr } return err } // bypassRowsAwaitDone is only used for testing. // If true, it will not close the ScanRows automatically from the context. var bypassRowsAwaitDone = false func (rs *ScanRows) initContextClose(ctx, txctx context.Context) { if ctx.Done() == nil && (txctx == nil || txctx.Done() == nil) { return } if bypassRowsAwaitDone { return } ctx, rs.cancel = context.WithCancel(ctx) go rs.awaitDone(ctx, txctx) } // awaitDone blocks until either ctx or txctx is canceled. The ctx is provided // from the query context and is canceled when the query ScanRows is closed. // If the query was issued in a transaction, the transaction's context // is also provided in txctx to ensure ScanRows is closed if the Tx is closed. func (rs *ScanRows) awaitDone(ctx, txctx context.Context) { var txctxDone <-chan struct{} if txctx != nil { txctxDone = txctx.Done() } select { case <-ctx.Done(): case <-txctxDone: } rs.close(ctx.Err()) } // Next prepares the next result row for reading with the Scan method. It // returns true on success, or false if there is no next result row or an error // happened while preparing it. Err should be consulted to distinguish between // the two cases. // // Every call to Scan, even the first one, must be preceded by a call to Next. func (rs *ScanRows) Next() bool { var doClose, ok bool withLock(rs.closemu.RLocker(), func() { doClose, ok = rs.nextLocked() }) if doClose { rs.Close() } return ok } func (rs *ScanRows) nextLocked() (doClose, ok bool) { if rs.closed { return false, false } // Lock the driver connection before calling the driver interface // rowsi to prevent a Tx from rolling back the connection at the same time. //rs.dc.Lock() //defer rs.dc.Unlock() if rs.lastcols == nil { rs.lastcols = make([]driver.Value, len(rs.rowsi.Columns())) } rs.lasterr = rs.rowsi.Next(rs.lastcols) if rs.lasterr != nil { // Close the connection if there is a driver error. if rs.lasterr != io.EOF { return true, false } nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet) if !ok { return true, false } // The driver is at the end of the current result set. // Test to see if there is another result set after the current one. // Only close ScanRows if there is no further result sets to read. if !nextResultSet.HasNextResultSet() { doClose = true } return doClose, false } return false, true } // NextResultSet prepares the next result set for reading. It reports whether // there is further result sets, or false if there is no further result set // or if there is an error advancing to it. The Err method should be consulted // to distinguish between the two cases. // // After calling NextResultSet, the Next method should always be called before // scanning. If there are further result sets they may not have rows in the result // set. func (rs *ScanRows) NextResultSet() bool { var doClose bool defer func() { if doClose { rs.Close() } }() rs.closemu.RLock() defer rs.closemu.RUnlock() if rs.closed { return false } rs.lastcols = nil nextResultSet, ok := rs.rowsi.(driver.RowsNextResultSet) if !ok { doClose = true return false } // Lock the driver connection before calling the driver interface // rowsi to prevent a Tx from rolling back the connection at the same time. //rs.dc.Lock() //defer rs.dc.Unlock() rs.lasterr = nextResultSet.NextResultSet() if rs.lasterr != nil { doClose = true return false } return true } // Err returns the error, if any, that was encountered during iteration. // Err may be called after an explicit or implicit Close. func (rs *ScanRows) Err() error { rs.closemu.RLock() defer rs.closemu.RUnlock() return rs.lasterrOrErrLocked(nil) } var errRowsClosed = errors.New("sql: ScanRows are closed") // Scan copies the columns in the current row into the values pointed // at by dest. The number of values in dest must be the same as the // number of columns in ScanRows. // // Scan converts columns read from the database into the following // common Go types and special types provided by the sql package: // // *string // *[]byte // *int, *int8, *int16, *int32, *int64 // *uint, *uint8, *uint16, *uint32, *uint64 // *bool // *float32, *float64 // *interface{} // *RawBytes // *ScanRows (cursor value) // any type implementing Scanner (see Scanner docs) // // In the most simple case, if the type of the value from the source // column is an integer, bool or string type T and dest is of type *T, // Scan simply assigns the value through the pointer. // // Scan also converts between string and numeric types, as long as no // information would be lost. While Scan stringifies all numbers // scanned from numeric database columns into *string, scans into // numeric types are checked for overflow. For example, a float64 with // value 300 or a string with value "300" can scan into a uint16, but // not into a uint8, though float64(255) or "255" can scan into a // uint8. One exception is that scans of some float64 numbers to // strings may lose information when stringifying. In general, scan // floating point columns into *float64. // // If a dest argument has type *[]byte, Scan saves in that argument a // copy of the corresponding data. The copy is owned by the caller and // can be modified and held indefinitely. The copy can be avoided by // using an argument of type *RawBytes instead; see the documentation // for RawBytes for restrictions on its use. // // If an argument has type *interface{}, Scan copies the value // provided by the underlying driver without conversion. When scanning // from a source value of type []byte to *interface{}, a copy of the // slice is made and the caller owns the result. // // Source values of type time.Time may be scanned into values of type // *time.Time, *interface{}, *string, or *[]byte. When converting to // the latter two, time.RFC3339Nano is used. // // Source values of type bool may be scanned into types *bool, // *interface{}, *string, *[]byte, or *RawBytes. // // For scanning into *bool, the source may be true, false, 1, 0, or // string inputs parseable by strconv.ParseBool. // // Scan can also convert a cursor returned from a query, such as // "select cursor(select * from my_table) from dual", into a // *ScanRows value that can itself be scanned from. The parent // select query will close any cursor *ScanRows if the parent *ScanRows is closed. // // If any of the first arguments implementing Scanner returns an error, // that error will be wrapped in the returned error func (rs *ScanRows) Scan(dest ...interface{}) error { rs.closemu.RLock() if rs.lasterr != nil && rs.lasterr != io.EOF { rs.closemu.RUnlock() return rs.lasterr } if rs.closed { err := rs.lasterrOrErrLocked(errRowsClosed) rs.closemu.RUnlock() return err } rs.closemu.RUnlock() if rs.lastcols == nil { return errors.New("sql: Scan called without calling Next") } if len(dest) != len(rs.lastcols) { return fmt.Errorf("sql: expected %d destination arguments in Scan, not %d", len(rs.lastcols), len(dest)) } for i, sv := range rs.lastcols { if sv == nil { continue } // the type of dest may be NullString, NullInt64, int64, etc, we should call its Scan() ty := reflect.TypeOf(dest[i]) fn, ok := ty.MethodByName("Scan") if !ok { err := convertAssignRows(dest[i], sv, rs) if err != nil { return fmt.Errorf(`sql: Scan error on column index %d, name %q: %w`, i, rs.rowsi.Columns()[i], err) } } else { res := fn.Func.Call([]reflect.Value{reflect.ValueOf(dest[i]), reflect.ValueOf(sv)}) if len(res) > 0 && !res[0].IsNil() { return fmt.Errorf(`sql: Scan error on column index %d, name %q: %v`, i, rs.rowsi.Columns()[i], res[0].Elem().String()) } } } return nil } // rowsCloseHook returns a function so tests may install the // hook through a test only mutex. var rowsCloseHook = func() func(*ScanRows, *error) { return nil } // Close closes the ScanRows, preventing further enumeration. If Next is called // and returns false and there are no further result sets, // the ScanRows are closed automatically and it will suffice to check the // result of Err. Close is idempotent and does not affect the result of Err. func (rs *ScanRows) Close() error { //return rs.close(nil) return nil } func (rs *ScanRows) close(err error) error { rs.closemu.Lock() defer rs.closemu.Unlock() if rs.closed { return nil } rs.closed = true if rs.lasterr == nil { rs.lasterr = err } err = rs.rowsi.Close() //withLock(rs.dc, func() { // err = rs.rowsi.Close() //}) if fn := rowsCloseHook(); fn != nil { fn(rs, &err) } if rs.cancel != nil { rs.cancel() } //if rs.closeStmt != nil { // rs.closeStmt.Close() //} rs.releaseConn(err) return err } // withLock runs while holding lk. func withLock(lk sync.Locker, fn func()) { lk.Lock() defer lk.Unlock() // in case fn panics fn() }