func RunPgx()

in benchmarks/latency-comparison/golang/runners/pgx_runner.go [27:88]


func RunPgx(database, sql string, readWrite bool, numOperations, numClients, wait int, host string, port int, useUnixSocket bool) ([]float64, error) {
	ctx := context.Background()
	var err error

	// Connect to Cloud Spanner through PGAdapter.
	var connString string
	if useUnixSocket {
		connString = fmt.Sprintf("host=%s port=%d database=%s", host, port, database)
	} else {
		connString = fmt.Sprintf("postgres://uid:pwd@%s:%d/%s?sslmode=disable", host, port, url.QueryEscape(database))
	}
	conns := make([]*pgx.Conn, numClients)
	for c := 0; c < numClients; c++ {
		conns[c], err = pgx.Connect(ctx, connString)
		if err != nil {
			return nil, err
		}
		defer conns[c].Close(ctx)
	}

	// Run one query to warm up.
	if readWrite {
		if _, err := executePgxUpdate(ctx, conns[0], sql); err != nil {
			return nil, err
		}
	} else {
		if _, err := executePgxQuery(ctx, conns[0], sql); err != nil {
			return nil, err
		}
	}

	var ops atomic.Int64
	var finished atomic.Bool
	totalOps := numOperations * numClients
	runTimes := make([]float64, totalOps)
	wg := sync.WaitGroup{}
	wg.Add(numClients)
	for c := 0; c < numClients; c++ {
		clientIndex := c
		go func() error {
			defer wg.Done()
			for n := 0; n < numOperations; n++ {
				randWait(wait)
				if readWrite {
					runTimes[clientIndex*numOperations+n], err = executePgxUpdate(ctx, conns[clientIndex], sql)
				} else {
					runTimes[clientIndex*numOperations+n], err = executePgxQuery(ctx, conns[clientIndex], sql)
				}
				if err != nil {
					return err
				}
				ops.Add(1)
			}
			return nil
		}()
	}
	printProgress(&finished, &ops, totalOps)
	wg.Wait()
	finished.Store(true)
	fmt.Printf("\r%d/%d\n\n", ops.Load(), totalOps)
	return runTimes, nil
}