systemtest/gencorpora/catbulk.go (180 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package gencorpora import ( "bufio" "bytes" "compress/gzip" "context" "encoding/json" "fmt" "io" "log" "net" "net/http" "os" "path/filepath" "time" "github.com/elastic/go-elasticsearch/v8/esutil" ) // CatBulkServer wraps http server and a listener to listen // for ES requests on any available port type CatBulkServer struct { listener net.Listener server *http.Server Addr string writer io.WriteCloser metaUpdateChan chan docsStat metaWriteDone chan struct{} } // docsStat represents statistics of ES docs generated by a request type docsStat struct { count int bytes int } // NewCatBulkServer returns a HTTP Server which can serve as a // fake ES server writing the response of the bulk request to the // provided writer. Writes to the provided writer must be thread safe. func NewCatBulkServer() (*CatBulkServer, error) { listener, err := net.Listen("tcp", ":0") if err != nil { return nil, err } writer, err := os.Create(gencorporaConfig.CorporaPath) if err != nil { return nil, err } addr := listener.Addr().String() metaUpdateChan := make(chan docsStat) return &CatBulkServer{ listener: listener, Addr: addr, server: &http.Server{ Addr: addr, Handler: handleReq(metaUpdateChan, writer), }, writer: writer, metaUpdateChan: metaUpdateChan, metaWriteDone: make(chan struct{}), }, nil } // Serve starts the fake ES server on a listener. func (s *CatBulkServer) Serve() error { go func() { if err := s.metaWriter(); err != nil { log.Println("failed to write metadata", err) } }() if err := s.server.Serve(s.listener); err != nil && err != http.ErrServerClosed { return err } return nil } // Stop initiates graceful shutdown the underlying HTTP server and writes // generated corpus metadata on successful shutdown. func (s *CatBulkServer) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() defer s.writer.Close() if err := s.server.Shutdown(ctx); err != nil { return fmt.Errorf("failed to shutdown cat bulk server no metadata written: %w", err) } close(s.metaUpdateChan) <-s.metaWriteDone return nil } func (s *CatBulkServer) metaWriter() error { defer close(s.metaWriteDone) metadata := struct { SourceFile string `json:"source-file"` DocumentCount int `json:"document-count"` UncompressedBytes int `json:"uncompressed-bytes"` IncludedsActionAndMetadata bool `json:"includes-action-and-meta-data"` }{ SourceFile: gencorporaConfig.CorporaPath, IncludedsActionAndMetadata: true, } // Overriding meta source root dir allows for creating corpora // and metadata with a custom root dir. This option will mostly // be useful for creating corpora to be used on remote setups. if gencorporaConfig.OverrideMetaSourceRootDir != "" { filename := filepath.Base(gencorporaConfig.CorporaPath) metadata.SourceFile = filepath.Join( gencorporaConfig.OverrideMetaSourceRootDir, filename, ) } // update metadata as request is received by the server for stat := range s.metaUpdateChan { metadata.DocumentCount += stat.count metadata.UncompressedBytes += stat.bytes } // write metadata to a file metadataBytes, err := json.MarshalIndent(metadata, "", " ") if err != nil { return err } writer, err := os.Create(gencorporaConfig.MetadataPath) defer writer.Close() if _, err := writer.Write(metadataBytes); err != nil { return err } return nil } func handleReq(metaUpdateChan chan docsStat, writer io.Writer) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") switch req.Method { case http.MethodGet: w.WriteHeader(http.StatusOK) w.Write([]byte(`{"cluster_uuid": "cat_bulk"}`)) case http.MethodPost: reader := req.Body defer req.Body.Close() if encoding := req.Header.Get("Content-Encoding"); encoding == "gzip" { var err error reader, err = gzip.NewReader(reader) if err != nil { log.Println("failed to read request body", err) w.WriteHeader(http.StatusBadRequest) return } defer reader.Close() } mockResp := esutil.BulkIndexerResponse{} scanner := bufio.NewScanner(reader) scanner.Split(splitMetadataAndSource) var stat docsStat for scanner.Scan() { n, err := writer.Write(scanner.Bytes()) if err != nil { // Discard the request without processing further log.Println("failed to write ES corpora to a file", err) w.WriteHeader(http.StatusInternalServerError) return } stat.count++ stat.bytes += n item := map[string]esutil.BulkIndexerResponseItem{ "action": {Status: http.StatusOK}, } mockResp.Items = append(mockResp.Items, item) } if err := scanner.Err(); err != nil { log.Println("failed to read ES corpora", err) w.WriteHeader(http.StatusBadRequest) return } // Update metadata with the ES document statistics generated by this request metaUpdateChan <- stat resp, err := json.Marshal(mockResp) if err != nil { log.Println("failed to encode response to JSON", err) w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) w.Write(resp) default: w.WriteHeader(http.StatusNotFound) } }) } // splitMetadataAndSource splits the input ES corpora expecting each corpus to have // action-and-metdata line followed by source document in an ndjson format. The EOL // markers are preserved and included in the token. func splitMetadataAndSource(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil } if i := bytes.IndexByte(data, '\n'); i >= 0 { // This represents metadata EOL marker // Try to find the source EOL marker if len(data) > i+1 { if j := bytes.IndexByte(data[i+1:], '\n'); j >= 0 { // This represents source EOL marker return i + j + 2, data[:i+j+2], nil } } } // At EOF the scanner will be in one of the following state: // 1. We don't have both action and metadata for atleast one document // 2. We have a final non-terminated line // We can return the data as is in both cases. Case 1 may represents input doc // to not be as metadata and action but is left to be handled by the consumer of // the generated corpus. if atEOF { return len(data), data, nil } // Request more data. return 0, nil, nil }