in internal/benchrunner/runners/rally/runner.go [947:1059]
func (r *runner) reindexData(ctx context.Context) error {
if !r.options.ReindexData {
return nil
}
if r.options.ESMetricsAPI == nil {
return errors.New("the option to reindex data is set, but the metricstore was not initialized")
}
logger.Debug("starting reindexing of data...")
logger.Debug("getting original mappings...")
// Get the mapping from the source data stream
mappingRes, err := r.options.ESAPI.Indices.GetMapping(
r.options.ESAPI.Indices.GetMapping.WithContext(ctx),
r.options.ESAPI.Indices.GetMapping.WithIndex(r.runtimeDataStream),
)
if err != nil {
return fmt.Errorf("error getting mapping: %w", err)
}
defer mappingRes.Body.Close()
if mappingRes.IsError() {
return fmt.Errorf("error getting mapping: %s", mappingRes)
}
body, err := io.ReadAll(mappingRes.Body)
if err != nil {
return fmt.Errorf("error reading mapping body: %w", err)
}
mappings := map[string]struct {
Mappings json.RawMessage
}{}
if err := json.Unmarshal(body, &mappings); err != nil {
return fmt.Errorf("error unmarshaling mappings: %w", err)
}
if len(mappings) != 1 {
return fmt.Errorf("exactly 1 mapping was expected, got %d", len(mappings))
}
var mapping string
for _, v := range mappings {
mapping = string(v.Mappings)
}
reader := bytes.NewReader(
[]byte(fmt.Sprintf(`{
"settings": {"number_of_replicas":0},
"mappings": %s
}`, mapping)),
)
indexName := fmt.Sprintf("bench-reindex-%s-%s", r.runtimeDataStream, r.svcInfo.Test.RunID)
logger.Debugf("creating %s index in metricstore...", indexName)
createRes, err := r.options.ESMetricsAPI.Indices.Create(
indexName,
r.options.ESMetricsAPI.Indices.Create.WithContext(ctx),
r.options.ESMetricsAPI.Indices.Create.WithBody(reader),
)
if err != nil {
return fmt.Errorf("could not create index: %w", err)
}
defer createRes.Body.Close()
if createRes.IsError() {
return fmt.Errorf("got a response error while creating index: %s", createRes)
}
bodyReader := strings.NewReader(`{"query":{"match_all":{}}}`)
logger.Debug("starting scrolling of events...")
res, err := r.options.ESAPI.Search(
r.options.ESAPI.Search.WithContext(ctx),
r.options.ESAPI.Search.WithIndex(r.runtimeDataStream),
r.options.ESAPI.Search.WithBody(bodyReader),
r.options.ESAPI.Search.WithScroll(time.Minute),
r.options.ESAPI.Search.WithSize(10000),
)
if err != nil {
return fmt.Errorf("error executing search: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("error executing search: %s", res)
}
// Iterate through the search results using the Scroll API
for {
var sr searchResponse
if err := json.NewDecoder(res.Body).Decode(&sr); err != nil {
return fmt.Errorf("error decoding search response: %w", err)
}
if sr.Error != nil {
return fmt.Errorf("error searching for documents: %s", sr.Error.Reason)
}
if len(sr.Hits) == 0 {
break
}
err := r.bulkMetrics(ctx, indexName, sr)
if err != nil {
return err
}
}
logger.Debug("reindexing operation finished")
return nil
}