in benchmarks/latency-comparison/golang/runners/pgx_runner.go [27:88]
func RunPgx(database, sql string, readWrite bool, numOperations, numClients, wait int, host string, port int, useUnixSocket bool) ([]float64, error) {
ctx := context.Background()
var err error
// Connect to Cloud Spanner through PGAdapter.
var connString string
if useUnixSocket {
connString = fmt.Sprintf("host=%s port=%d database=%s", host, port, database)
} else {
connString = fmt.Sprintf("postgres://uid:pwd@%s:%d/%s?sslmode=disable", host, port, url.QueryEscape(database))
}
conns := make([]*pgx.Conn, numClients)
for c := 0; c < numClients; c++ {
conns[c], err = pgx.Connect(ctx, connString)
if err != nil {
return nil, err
}
defer conns[c].Close(ctx)
}
// Run one query to warm up.
if readWrite {
if _, err := executePgxUpdate(ctx, conns[0], sql); err != nil {
return nil, err
}
} else {
if _, err := executePgxQuery(ctx, conns[0], sql); err != nil {
return nil, err
}
}
var ops atomic.Int64
var finished atomic.Bool
totalOps := numOperations * numClients
runTimes := make([]float64, totalOps)
wg := sync.WaitGroup{}
wg.Add(numClients)
for c := 0; c < numClients; c++ {
clientIndex := c
go func() error {
defer wg.Done()
for n := 0; n < numOperations; n++ {
randWait(wait)
if readWrite {
runTimes[clientIndex*numOperations+n], err = executePgxUpdate(ctx, conns[clientIndex], sql)
} else {
runTimes[clientIndex*numOperations+n], err = executePgxQuery(ctx, conns[clientIndex], sql)
}
if err != nil {
return err
}
ops.Add(1)
}
return nil
}()
}
printProgress(&finished, &ops, totalOps)
wg.Wait()
finished.Store(true)
fmt.Printf("\r%d/%d\n\n", ops.Load(), totalOps)
return runTimes, nil
}