in internal/mode/chunk/chunk.go [181:255]
func Run(buildOpts shared.BuildOpts) error {
cmdOpts, err := parseFlags()
if err != nil {
return err
}
slog.Debug("parsing options", "optionsJSON", cmdOpts.OptionsJSON)
options := Options{}
decoder := json.NewDecoder(strings.NewReader(cmdOpts.OptionsJSON))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&options); err != nil {
return fmt.Errorf("failed to parse options: %w", err)
}
timeout, err := time.ParseDuration(options.Timeout)
if err != nil {
return fmt.Errorf("failed to parse Timeout: %v with error %w", options.Timeout, err)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
slog.Debug("created ctx", "ctx", ctx)
// TODO: Use context for all indexing/parsing operations.
// Parse connection parameters
conn, err := parseConnection(cmdOpts.AdapterType, cmdOpts.ConnectionJSON)
if err != nil {
return err
}
s := streamer.NewStdout()
streamErr := s.StreamSingle(&streamer.IndexerVersionInfo{
Version: buildOpts.Version,
BuildTime: buildOpts.BuildTime,
})
if streamErr != nil {
slog.Error("failed to stream indexer version info", "error", streamErr)
return nil
}
switch conn.GetAdapterType() {
case PostgreSQLAdapter:
pgConn := conn.(PostgreSQLConnection)
slog.Debug("parsed postgresql connection", "connection", pgConn)
case OpenSearchAdapter:
osConn := conn.(OpenSearchConnection)
slog.Debug("parsed opensearch connection", "connection", osConn)
case ElasticsearchAdapter:
esConn := conn.(ElasticsearchConnection)
slog.Debug("parsed elasticsearch connection", "connection", esConn)
}
streamErr = s.Stream([]streamer.Record{
&streamer.IndexedChunkInfo{
ID: "123",
},
&streamer.IndexedChunkInfo{
ID: "567",
},
})
if streamErr != nil {
slog.Error("failed to stream indexed chunk info", "error", streamErr)
}
if err := s.Close(); err != nil {
slog.Error("failed to close streamer", "error", err)
}
return nil
}