func()

in plugins/storage/rdbms/events.go [232:304]


func (r *RDBMS) GetTestEvents(ctx xcontext.Context, eventQuery *testevent.Query) ([]testevent.Event, error) {

	// Flush pending events before Get operations
	err := r.flushTestEvents()

	if err != nil {
		return nil, fmt.Errorf("could not flush events before reading events: %v", err)
	}

	r.lockTx()
	defer r.unlockTx()

	baseQuery := bytes.Buffer{}
	baseQuery.WriteString("select event_id, job_id, run_id, test_name, test_step_label, event_name, target_id, payload, emit_time from test_events")
	query, fields, err := buildTestEventQuery(baseQuery, eventQuery)
	if err != nil {
		return nil, fmt.Errorf("could not execute select query for test events: %v", err)
	}

	results := []testevent.Event{}
	ctx.Debugf("Executing query: %s, fields: %v", query, fields)
	rows, err := r.db.Query(query, fields...)
	if err != nil {
		return nil, err
	}

	// TargetID might be null, so a type which supports null should be used with Scan
	var (
		targetID sql.NullString
		payload  sql.NullString
	)

	defer func() {
		err := rows.Close()
		if err != nil {
			ctx.Warnf("could not close rows for test events: %v", err)
		}
	}()
	for rows.Next() {
		data := testevent.Data{}
		header := testevent.Header{}
		event := testevent.New(&header, &data)

		var eventID int
		err := rows.Scan(
			&eventID,
			&header.JobID,
			&header.RunID,
			&header.TestName,
			&header.TestStepLabel,
			&data.EventName,
			&targetID,
			&payload,
			&event.EmitTime,
		)
		if err != nil {
			return nil, fmt.Errorf("could not read results from db: %v", err)
		}
		if targetID.Valid {
			t := target.Target{ID: targetID.String}
			data.Target = &t
		}

		if payload.Valid {
			rawPayload := json.RawMessage(payload.String)
			data.Payload = &rawPayload

		}

		results = append(results, event)
	}
	return results, nil
}