func Run()

in internal/mode/chunk/chunk.go [181:255]


func Run(buildOpts shared.BuildOpts) error {
	cmdOpts, err := parseFlags()
	if err != nil {
		return err
	}

	slog.Debug("parsing options", "optionsJSON", cmdOpts.OptionsJSON)
	options := Options{}
	decoder := json.NewDecoder(strings.NewReader(cmdOpts.OptionsJSON))
	decoder.DisallowUnknownFields()
	if err := decoder.Decode(&options); err != nil {
		return fmt.Errorf("failed to parse options: %w", err)
	}

	timeout, err := time.ParseDuration(options.Timeout)
	if err != nil {
		return fmt.Errorf("failed to parse Timeout: %v with error %w", options.Timeout, err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	slog.Debug("created ctx", "ctx", ctx)
	// TODO: Use context for all indexing/parsing operations.

	// Parse connection parameters
	conn, err := parseConnection(cmdOpts.AdapterType, cmdOpts.ConnectionJSON)
	if err != nil {
		return err
	}

	s := streamer.NewStdout()

	streamErr := s.StreamSingle(&streamer.IndexerVersionInfo{
		Version:   buildOpts.Version,
		BuildTime: buildOpts.BuildTime,
	})
	if streamErr != nil {
		slog.Error("failed to stream indexer version info", "error", streamErr)
		return nil
	}

	switch conn.GetAdapterType() {
	case PostgreSQLAdapter:
		pgConn := conn.(PostgreSQLConnection)

		slog.Debug("parsed postgresql connection", "connection", pgConn)
	case OpenSearchAdapter:
		osConn := conn.(OpenSearchConnection)

		slog.Debug("parsed opensearch connection", "connection", osConn)
	case ElasticsearchAdapter:
		esConn := conn.(ElasticsearchConnection)

		slog.Debug("parsed elasticsearch connection", "connection", esConn)
	}

	streamErr = s.Stream([]streamer.Record{
		&streamer.IndexedChunkInfo{
			ID: "123",
		},
		&streamer.IndexedChunkInfo{
			ID: "567",
		},
	})
	if streamErr != nil {
		slog.Error("failed to stream indexed chunk info", "error", streamErr)
	}

	if err := s.Close(); err != nil {
		slog.Error("failed to close streamer", "error", err)
	}

	return nil
}