_examples/xkcdsearch/cmd/xkcd/commands/index.go (243 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package commands import ( "encoding/json" "fmt" "math/rand" "net/http" "os" "regexp" "strconv" "sync" "time" "github.com/rs/zerolog" "github.com/spf13/cobra" "github.com/elastic/go-elasticsearch/v9" "github.com/elastic/go-elasticsearch/v9/_examples/xkcdsearch" ) var ( indexSetup bool crawlerWorkers int ) func init() { rootCmd.AddCommand(indexCmd) indexCmd.Flags().BoolVar(&indexSetup, "setup", false, "Create Elasticsearch index") indexCmd.Flags().IntVar(&crawlerWorkers, "workers", 25, "Number of concurrent workers") } var indexCmd = &cobra.Command{ Use: "index", Short: "Index xkcd.com into Elasticsearch", Run: func(cmd *cobra.Command, args []string) { crawler := Crawler{ log: zerolog.New(zerolog.ConsoleWriter{Out: os.Stdout}). Level(func() zerolog.Level { if os.Getenv("DEBUG") != "" { return zerolog.DebugLevel } else { return zerolog.InfoLevel } }()). With(). Timestamp(). Logger(), workers: crawlerWorkers, queue: make(chan string, crawlerWorkers), reURL: regexp.MustCompile(`https\://xkcd\.com/(?P<ID>\d+)/.*\.json`), nextURL: func(c *Crawler, u string) string { id, err := c.documentIDFromURL(u) if err != nil || id <= 1 { return "" } return fmt.Sprintf("https://xkcd.com/%d/info.0.json", id-1) }, StartURL: "https://xkcd.com/info.0.json", } es, err := elasticsearch.NewDefaultClient() if err != nil { crawler.log.Fatal().Err(err).Msg("Error creating Elasticsearch client") } config := xkcdsearch.StoreConfig{Client: es, IndexName: IndexName} store, err := xkcdsearch.NewStore(config) if err != nil { crawler.log.Fatal().Err(err).Msg("Cannot create store") } crawler.store = store if indexSetup { crawler.log.Info().Msg("Creating index with mapping") if err := crawler.setupIndex(); err != nil { crawler.log.Fatal().Err(err).Msg("Cannot create Elasticsearch index") } } crawler.log.Info().Msgf("Starting the crawl with %d workers at <%s>", crawler.workers, crawler.StartURL) crawler.Run() }, } // Crawler allows to crawl and index comics from xkcd.com. type Crawler struct { store *xkcdsearch.Store log zerolog.Logger workers int queue chan string reURL *regexp.Regexp nextURL func(crawler *Crawler, currentURL string) (nextURL string) StartURL string } // Run launches the crawler. func (c *Crawler) Run() { var ( wg sync.WaitGroup nextURL string ) rand.Seed(time.Now().Unix()) // Setup worker goroutines for i := 1; i < c.workers+1; i++ { c.log.Debug().Msgf("Creating worker %d", i) wg.Add(1) go func() { defer wg.Done() for url := range c.queue { c.ProcessURL(url) } }() } // Start crawling if doc := c.ProcessURL(c.StartURL); doc.ID != "" { id, err := strconv.Atoi(doc.ID) if err != nil { c.log.Fatal().Err(err).Msg("Cannot get latest comic ID") } nextURL = fmt.Sprintf("https://xkcd.com/%d/info.0.json", id) } for { if nextURL = c.NextURL(nextURL); nextURL == "" { close(c.queue) break } c.log.Debug().Str("URL", nextURL).Msg("Adding URL to queue") c.queue <- nextURL } wg.Wait() } // ProcessURL parses the JSON data and stores document. func (c *Crawler) ProcessURL(url string) (doc xkcdsearch.Document) { c.log.Debug().Str("URL", url).Msg("Processing URL") var id int if url != c.StartURL { id, err := c.documentIDFromURL(url) if err != nil { c.log.Fatal().Err(err).Msg("Error getting ID from URL") } if id == 404 { c.log.Info().Str("ID", "404").Msg("Skipping missing doc") return doc } if ok, err := c.existsDocument(strconv.Itoa(id)); ok { if err != nil { c.log.Error().Err(err).Int("ID", id).Msg("Error skipping existing doc") } else { c.log.Info().Int("ID", id).Msg("Skipping existing doc") } return doc } } res, err := c.loadURL(url) if err != nil { c.log.Error().Err(err).Int("ID", id).Msg("Error loading xkcd.com") } doc, err = c.processResponse(res) if err != nil { c.log.Error().Err(err).Str("ID", doc.ID).Msg("Error processing response") } else { err = c.storeDocument(&doc) if err != nil { c.log.Error().Err(err).Str("ID", doc.ID).Msg("Error storing doc") } else { c.log.Info().Str("ID", doc.ID).Str("title", doc.Title).Msg("Stored doc") } } time.Sleep(time.Duration(rand.Intn(100)+100) * time.Millisecond) return doc } // NextURL returns the next URL or an empty string. func (c *Crawler) NextURL(url string) string { return c.nextURL(c, url) } func (c *Crawler) documentIDFromURL(url string) (id int, err error) { m := c.reURL.FindStringSubmatch(url) if len(m) > 0 && m[1] != "" { return strconv.Atoi(m[1]) } return 0, fmt.Errorf("cannot get ID from URL [%s]", url) } func (c *Crawler) loadURL(url string) (*http.Response, error) { return http.Get(url) } func (c *Crawler) existsDocument(id string) (bool, error) { ok, err := c.store.Exists(id) if err != nil { return false, fmt.Errorf("store: %s", err) } return ok, nil } func (c *Crawler) storeDocument(doc *xkcdsearch.Document) error { return c.store.Create(doc) } func (c *Crawler) processResponse(res *http.Response) (xkcdsearch.Document, error) { defer res.Body.Close() var doc xkcdsearch.Document type jsonResponse struct { Num int `json:"num"` Year string Month string Day string Title string `json:"title"` Transcript string Alt string Link string News string Img string } if res.StatusCode != 200 { return doc, fmt.Errorf("[%s]", res.Status) } var j jsonResponse if err := json.NewDecoder(res.Body).Decode(&j); err != nil { return doc, err } jYear, err := strconv.ParseInt(j.Year, 10, 16) if err != nil { return doc, fmt.Errorf("strconv: %s", err) } jMonth, err := strconv.ParseInt(j.Month, 10, 8) if err != nil { return doc, fmt.Errorf("strconv: %s", err) } jDay, err := strconv.ParseInt(j.Day, 10, 8) if err != nil { return doc, fmt.Errorf("strconv: %s", err) } doc = xkcdsearch.Document{ ID: strconv.Itoa(j.Num), ImageURL: j.Img, Published: fmt.Sprintf("%04d-%02d-%02d", jYear, jMonth, jDay), Title: j.Title, Alt: j.Alt, Transcript: j.Transcript, Link: j.Link, News: j.News, } c.log.Debug().Interface("doc", doc).Msg("Downloaded") return doc, nil } func (c *Crawler) setupIndex() error { mapping := `{ "mappings": { "_doc": { "properties": { "id": { "type": "keyword" }, "image_url": { "type": "keyword" }, "title": { "type": "text", "analyzer": "english" }, "alt": { "type": "text", "analyzer": "english" }, "transcript": { "type": "text", "analyzer": "english" }, "published": { "type": "date" }, "link": { "type": "keyword" }, "news": { "type": "text", "analyzer": "english" } } } } }` return c.store.CreateIndex(mapping) }