internal/mode/chunk/chunk.go (199 lines of code) (raw):
package chunk
import (
"context"
"encoding/json"
"flag"
"fmt"
"log/slog"
"strings"
"time"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/mode/chunk/streamer"
"gitlab.com/gitlab-org/gitlab-elasticsearch-indexer/internal/shared"
)
// AdapterType represents the type of adapter to use
type AdapterType string
const (
PostgreSQLAdapter AdapterType = "postgresql"
OpenSearchAdapter AdapterType = "opensearch"
ElasticsearchAdapter AdapterType = "elasticsearch"
)
var validAdapters = [...]AdapterType{
PostgreSQLAdapter,
OpenSearchAdapter,
ElasticsearchAdapter,
}
// Connection is the base interface for all connection types
type Connection interface {
GetAdapterType() AdapterType
}
type GitalyConfig struct {
Address string `json:"address"`
Token string `json:"token"`
StorageName string `json:"storage"`
RelativePath string `json:"relative_path"`
ProjectPath string `json:"project_path"`
LimitFileSize int64 `json:"limit_file_size"`
TokenVersion int `json:"token_version"`
}
type Options struct {
ProjectID uint64 `json:"project_id"`
FromSHA string `json:"from_sha"`
ToSHA string `json:"to_sha"`
CorrelationID string `json:"correlation_id"`
SchemaVersion uint16 `json:"schema_version"`
ForceReindex bool `json:"force_reindex"`
ChunkSize uint16 `json:"chunk_size"`
ChunkOverlap uint16 `json:"chunk_overlap"`
GitalyConfig GitalyConfig `json:"gitaly_config"`
Timeout string `json:"timeout"`
}
type PostgreSQLConnection struct {
Host string `json:"host"`
Port uint16 `json:"port"`
User string `json:"user"`
Password string `json:"password"`
Database string `json:"database"`
Table string `json:"table"`
}
type OpenSearchConnection struct {
// OpenSearch-specific connection parameters
}
type ElasticsearchConnection struct {
// Elasticsearch-specific connection parameters
}
func (c PostgreSQLConnection) GetAdapterType() AdapterType {
return PostgreSQLAdapter
}
func (c OpenSearchConnection) GetAdapterType() AdapterType {
return OpenSearchAdapter
}
func (c ElasticsearchConnection) GetAdapterType() AdapterType {
return ElasticsearchAdapter
}
func decodeConnection[T any](connectionJSON string, adapterType AdapterType) (T, error) {
var result T
decoder := json.NewDecoder(strings.NewReader(connectionJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&result); err != nil {
return result, fmt.Errorf("failed to parse %s connection: %w", adapterType, err)
}
return result, nil
}
// parseConnection parses the connection JSON string based on the adapter type
func parseConnection(adapterType AdapterType, connectionJSON string) (Connection, error) {
switch adapterType {
case PostgreSQLAdapter:
result, err := decodeConnection[PostgreSQLConnection](connectionJSON, adapterType)
if err != nil {
return nil, err
}
return result, nil
case OpenSearchAdapter:
result, err := decodeConnection[OpenSearchConnection](connectionJSON, adapterType)
if err != nil {
return nil, err
}
return result, nil
case ElasticsearchAdapter:
result, err := decodeConnection[ElasticsearchConnection](connectionJSON, adapterType)
if err != nil {
return nil, err
}
return result, nil
default:
return nil, fmt.Errorf("unknown adapter: %s", adapterType)
}
}
func isValidAdapter(adapter AdapterType) bool {
for _, validAdapter := range validAdapters {
if adapter == validAdapter {
return true
}
}
return false
}
// CommandOptions contains all the parsed command-line flags
type CommandOptions struct {
AdapterType AdapterType
OptionsJSON string
ConnectionJSON string
}
// parseFlags parses command line flags and validates them
func parseFlags() (CommandOptions, error) {
var opts CommandOptions
var adapterStr string
flag.StringVar(&opts.OptionsJSON, "options", "", "JSON string of options for chunking")
validAdaptersList := make([]string, len(validAdapters))
for i, adapter := range validAdapters {
validAdaptersList[i] = string(adapter)
}
flag.StringVar(&adapterStr, "adapter", "", fmt.Sprintf("Adapter to use (%s)", strings.Join(validAdaptersList, ", ")))
flag.StringVar(&opts.ConnectionJSON, "connection", "", "JSON string of connection parameters")
flag.Parse()
if adapterStr == "" {
return CommandOptions{}, fmt.Errorf("-adapter flag is required")
}
if opts.OptionsJSON == "" {
return CommandOptions{}, fmt.Errorf("-options flag is required")
}
if opts.ConnectionJSON == "" {
return CommandOptions{}, fmt.Errorf("-connection flag is required")
}
opts.AdapterType = AdapterType(adapterStr)
if !isValidAdapter(opts.AdapterType) {
return CommandOptions{}, fmt.Errorf("invalid adapter: %s, must be one of: %s", opts.AdapterType, strings.Join(validAdaptersList, ", "))
}
return opts, nil
}
// Run executes the chunk mode with the provided arguments and options
func Run(buildOpts shared.BuildOpts) error {
cmdOpts, err := parseFlags()
if err != nil {
return err
}
slog.Debug("parsing options", "optionsJSON", cmdOpts.OptionsJSON)
options := Options{}
decoder := json.NewDecoder(strings.NewReader(cmdOpts.OptionsJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&options); err != nil {
return fmt.Errorf("failed to parse options: %w", err)
}
timeout, err := time.ParseDuration(options.Timeout)
if err != nil {
return fmt.Errorf("failed to parse Timeout: %v with error %w", options.Timeout, err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
slog.Debug("created ctx", "ctx", ctx)
// TODO: Use context for all indexing/parsing operations.
// Parse connection parameters
conn, err := parseConnection(cmdOpts.AdapterType, cmdOpts.ConnectionJSON)
if err != nil {
return err
}
s := streamer.NewStdout()
streamErr := s.StreamSingle(&streamer.IndexerVersionInfo{
Version: buildOpts.Version,
BuildTime: buildOpts.BuildTime,
})
if streamErr != nil {
slog.Error("failed to stream indexer version info", "error", streamErr)
return nil
}
switch conn.GetAdapterType() {
case PostgreSQLAdapter:
pgConn := conn.(PostgreSQLConnection)
slog.Debug("parsed postgresql connection", "connection", pgConn)
case OpenSearchAdapter:
osConn := conn.(OpenSearchConnection)
slog.Debug("parsed opensearch connection", "connection", osConn)
case ElasticsearchAdapter:
esConn := conn.(ElasticsearchConnection)
slog.Debug("parsed elasticsearch connection", "connection", esConn)
}
streamErr = s.Stream([]streamer.Record{
&streamer.IndexedChunkInfo{
ID: "123",
},
&streamer.IndexedChunkInfo{
ID: "567",
},
})
if streamErr != nil {
slog.Error("failed to stream indexed chunk info", "error", streamErr)
}
if err := s.Close(); err != nil {
slog.Error("failed to close streamer", "error", err)
}
return nil
}