internal/mode/chunk/chunk.go (199 lines of code) (raw):

package chunk import ( "context" "encoding/json" "flag" "fmt" "log/slog" "strings" "time" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/chunk/streamer" "gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/shared" ) // AdapterType represents the type of adapter to use type AdapterType string const ( PostgreSQLAdapter AdapterType = "postgresql" OpenSearchAdapter AdapterType = "opensearch" ElasticsearchAdapter AdapterType = "elasticsearch" ) var validAdapters = [...]AdapterType{ PostgreSQLAdapter, OpenSearchAdapter, ElasticsearchAdapter, } // Connection is the base interface for all connection types type Connection interface { GetAdapterType() AdapterType } type GitalyConfig struct { Address string `json:"address"` Token string `json:"token"` StorageName string `json:"storage"` RelativePath string `json:"relative_path"` ProjectPath string `json:"project_path"` LimitFileSize int64 `json:"limit_file_size"` TokenVersion int `json:"token_version"` } type Options struct { ProjectID uint64 `json:"project_id"` FromSHA string `json:"from_sha"` ToSHA string `json:"to_sha"` CorrelationID string `json:"correlation_id"` SchemaVersion uint16 `json:"schema_version"` ForceReindex bool `json:"force_reindex"` ChunkSize uint16 `json:"chunk_size"` ChunkOverlap uint16 `json:"chunk_overlap"` GitalyConfig GitalyConfig `json:"gitaly_config"` Timeout string `json:"timeout"` } type PostgreSQLConnection struct { Host string `json:"host"` Port uint16 `json:"port"` User string `json:"user"` Password string `json:"password"` Database string `json:"database"` Table string `json:"table"` } type OpenSearchConnection struct { // OpenSearch-specific connection parameters } type ElasticsearchConnection struct { // Elasticsearch-specific connection parameters } func (c PostgreSQLConnection) GetAdapterType() AdapterType { return PostgreSQLAdapter } func (c OpenSearchConnection) GetAdapterType() AdapterType { return OpenSearchAdapter } func (c ElasticsearchConnection) GetAdapterType() AdapterType { return ElasticsearchAdapter } func decodeConnection[T any](connectionJSON string, adapterType AdapterType) (T, error) { var result T decoder := json.NewDecoder(strings.NewReader(connectionJSON)) decoder.DisallowUnknownFields() if err := decoder.Decode(&result); err != nil { return result, fmt.Errorf("failed to parse %s connection: %w", adapterType, err) } return result, nil } // parseConnection parses the connection JSON string based on the adapter type func parseConnection(adapterType AdapterType, connectionJSON string) (Connection, error) { switch adapterType { case PostgreSQLAdapter: result, err := decodeConnection[PostgreSQLConnection](connectionJSON, adapterType) if err != nil { return nil, err } return result, nil case OpenSearchAdapter: result, err := decodeConnection[OpenSearchConnection](connectionJSON, adapterType) if err != nil { return nil, err } return result, nil case ElasticsearchAdapter: result, err := decodeConnection[ElasticsearchConnection](connectionJSON, adapterType) if err != nil { return nil, err } return result, nil default: return nil, fmt.Errorf("unknown adapter: %s", adapterType) } } func isValidAdapter(adapter AdapterType) bool { for _, validAdapter := range validAdapters { if adapter == validAdapter { return true } } return false } // CommandOptions contains all the parsed command-line flags type CommandOptions struct { AdapterType AdapterType OptionsJSON string ConnectionJSON string } // parseFlags parses command line flags and validates them func parseFlags() (CommandOptions, error) { var opts CommandOptions var adapterStr string flag.StringVar(&opts.OptionsJSON, "options", "", "JSON string of options for chunking") validAdaptersList := make([]string, len(validAdapters)) for i, adapter := range validAdapters { validAdaptersList[i] = string(adapter) } flag.StringVar(&adapterStr, "adapter", "", fmt.Sprintf("Adapter to use (%s)", strings.Join(validAdaptersList, ", "))) flag.StringVar(&opts.ConnectionJSON, "connection", "", "JSON string of connection parameters") flag.Parse() if adapterStr == "" { return CommandOptions{}, fmt.Errorf("-adapter flag is required") } if opts.OptionsJSON == "" { return CommandOptions{}, fmt.Errorf("-options flag is required") } if opts.ConnectionJSON == "" { return CommandOptions{}, fmt.Errorf("-connection flag is required") } opts.AdapterType = AdapterType(adapterStr) if !isValidAdapter(opts.AdapterType) { return CommandOptions{}, fmt.Errorf("invalid adapter: %s, must be one of: %s", opts.AdapterType, strings.Join(validAdaptersList, ", ")) } return opts, nil } // Run executes the chunk mode with the provided arguments and options func Run(buildOpts shared.BuildOpts) error { cmdOpts, err := parseFlags() if err != nil { return err } slog.Debug("parsing options", "optionsJSON", cmdOpts.OptionsJSON) options := Options{} decoder := json.NewDecoder(strings.NewReader(cmdOpts.OptionsJSON)) decoder.DisallowUnknownFields() if err := decoder.Decode(&options); err != nil { return fmt.Errorf("failed to parse options: %w", err) } timeout, err := time.ParseDuration(options.Timeout) if err != nil { return fmt.Errorf("failed to parse Timeout: %v with error %w", options.Timeout, err) } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() slog.Debug("created ctx", "ctx", ctx) // TODO: Use context for all indexing/parsing operations. // Parse connection parameters conn, err := parseConnection(cmdOpts.AdapterType, cmdOpts.ConnectionJSON) if err != nil { return err } s := streamer.NewStdout() streamErr := s.StreamSingle(&streamer.IndexerVersionInfo{ Version: buildOpts.Version, BuildTime: buildOpts.BuildTime, }) if streamErr != nil { slog.Error("failed to stream indexer version info", "error", streamErr) return nil } switch conn.GetAdapterType() { case PostgreSQLAdapter: pgConn := conn.(PostgreSQLConnection) slog.Debug("parsed postgresql connection", "connection", pgConn) case OpenSearchAdapter: osConn := conn.(OpenSearchConnection) slog.Debug("parsed opensearch connection", "connection", osConn) case ElasticsearchAdapter: esConn := conn.(ElasticsearchConnection) slog.Debug("parsed elasticsearch connection", "connection", esConn) } streamErr = s.Stream([]streamer.Record{ &streamer.IndexedChunkInfo{ ID: "123", }, &streamer.IndexedChunkInfo{ ID: "567", }, }) if streamErr != nil { slog.Error("failed to stream indexed chunk info", "error", streamErr) } if err := s.Close(); err != nil { slog.Error("failed to close streamer", "error", err) } return nil }