components/webscraper/main.go (385 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"regexp"
"strconv"
"strings"
"cloud.google.com/go/firestore"
"cloud.google.com/go/storage"
"github.com/gocolly/colly"
"github.com/gocolly/colly/debug"
"google.golang.org/api/iterator"
)
// ScrapedDocument represents metadata about a scraped document
type ScrapedDocument struct {
Filename string `json:"filename"`
URL string `json:"url"`
GCSPath string `json:"gcs_path"`
ContentType string `json:"content_type"`
}
// JobInput represents the input data for a scraping job
type JobInput struct {
URL string `json:"url"`
EngineName string `json:"query_engine_name"`
DepthLimit string `json:"depth_limit"`
}
// Add global storage client
var storageClient *storage.Client
func main() {
ctx := context.Background()
// Configure logger
configureLogger()
// Retrieve environment variables
projectID, jobID := getEnvVariables()
// Initialize Firestore client
firestoreClient := initializeFirestore(ctx, projectID)
defer firestoreClient.Close()
// Initialize storage client
var err error
storageClient, err = storage.NewClient(ctx)
if err != nil {
log.Printf("Failed to create storage client: %v", err)
os.Exit(1)
}
defer storageClient.Close()
// Retrieve and parse job document
jobDoc, docRef := fetchJobDocument(ctx, firestoreClient, jobID)
jobInput := parseJobInput(ctx, jobDoc, docRef)
// Generate and initialize bucket
bucketName := generateAndInitializeBucket(ctx, projectID, jobInput, docRef)
// Set up Colly collector
collector, scrapedDocs := setupCollector(ctx, jobInput, bucketName, docRef)
// Start scraping
err = collector.Visit(jobInput.URL)
if err != nil {
log.Printf("Error starting scrape: %v", err)
}
collector.Wait()
log.Printf("Scraping complete. Found %d documents", len(*scrapedDocs))
// Save results and update job status
saveResults(ctx, firestoreClient, docRef, scrapedDocs)
}
func configureLogger() {
log.SetOutput(os.Stdout)
}
func getEnvVariables() (string, string) {
projectID := os.Getenv("GCP_PROJECT")
if projectID == "" {
err := fmt.Errorf("GCP_PROJECT environment variable not set")
log.Print(err)
os.Exit(1)
}
jobID := os.Getenv("JOB_ID")
if jobID == "" {
err := fmt.Errorf("JOB_ID environment variable not set")
log.Print(err)
os.Exit(1)
}
log.Printf("Processing job ID: %s", jobID)
return projectID, jobID
}
func initializeFirestore(ctx context.Context, projectID string) *firestore.Client {
firestoreClient, err := firestore.NewClient(ctx, projectID)
if err != nil {
log.Printf("Failed to create Firestore client: %v", err)
os.Exit(1)
}
return firestoreClient
}
func fetchJobDocument(ctx context.Context, firestoreClient *firestore.Client, jobID string) (*firestore.DocumentSnapshot, *firestore.DocumentRef) {
collectionName := "batch_jobs"
docRef := firestoreClient.Collection(collectionName).Doc(jobID)
jobDoc, err := docRef.Get(ctx)
if err != nil {
updateJobError(ctx, docRef, fmt.Errorf("failed to get job document: %v", err))
log.Print(err)
return nil, docRef
}
return jobDoc, docRef
}
func parseJobInput(ctx context.Context, jobDoc *firestore.DocumentSnapshot, docRef *firestore.DocumentRef) JobInput {
var jobInput JobInput
inputData, ok := jobDoc.Data()["input_data"].(string)
if !ok {
err := fmt.Errorf("failed to get input_data as string from job document")
updateJobError(ctx, docRef, err)
log.Print(err)
return jobInput
}
if err := json.Unmarshal([]byte(inputData), &jobInput); err != nil {
updateJobError(ctx, docRef, fmt.Errorf("failed to decode job input: %v", err))
log.Print(err)
return jobInput
}
log.Printf("Job input: %+v", jobInput)
// Validate job input
if jobInput.URL == "" {
err := fmt.Errorf("URL not found in input data")
updateJobError(ctx, docRef, err)
log.Print(err)
}
if jobInput.DepthLimit == "" {
err := fmt.Errorf("depth limit not found in input data")
updateJobError(ctx, docRef, err)
log.Print(err)
}
_, err := strconv.Atoi(jobInput.DepthLimit)
if err != nil {
err := fmt.Errorf("invalid depth limit value %s", jobInput.DepthLimit)
updateJobError(ctx, docRef, err)
log.Print(err)
}
if jobInput.EngineName == "" {
err := fmt.Errorf("query engine name not found in input data")
updateJobError(ctx, docRef, err)
log.Print(err)
}
return jobInput
}
func generateAndInitializeBucket(ctx context.Context, projectID string, jobInput JobInput, docRef *firestore.DocumentRef) string {
// Generate bucket name
bucketName, err := generateBucketName(projectID, jobInput.EngineName)
if err != nil {
updateJobError(ctx, docRef, err)
log.Print(err)
}
log.Printf("Using bucket: %s", bucketName)
// Initialize bucket
if err := initializeBucket(ctx, projectID, bucketName); err != nil {
updateJobError(ctx, docRef, fmt.Errorf("failed to initialize bucket: %v", err))
log.Print(err)
}
return bucketName
}
func setupCollector(ctx context.Context, jobInput JobInput, bucketName string, docRef *firestore.DocumentRef) (*colly.Collector, *[]ScrapedDocument) {
var scrapedDocs []ScrapedDocument
maxDepth := 1 // Default maxDepth to 1
// Parse maxDepth from jobInput, defaulting to 1 if invalid
if depth, err := strconv.Atoi(jobInput.DepthLimit); err == nil {
maxDepth = depth
} else {
log.Printf("Invalid depth limit in input: %s, defaulting to 1", jobInput.DepthLimit)
}
log.Printf("Max depth set to: %d", maxDepth)
baseDomain := extractDomain(jobInput.URL)
allowedDomains := []string{
baseDomain,
"www." + baseDomain,
}
log.Printf("Allowed domains: %v", allowedDomains)
// Create a new collector
c := colly.NewCollector(
colly.MaxDepth(maxDepth),
colly.AllowedDomains(allowedDomains...), // Allow both with and without www
colly.Debugger(&debug.LogDebugger{}),
colly.Async(true),
colly.UserAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"),
colly.IgnoreRobotsTxt(),
)
err := c.Limit(&colly.LimitRule{
DomainGlob: "*",
Parallelism: 5,
})
if err != nil {
log.Printf("Error setting limit rule: %v", err)
}
// Add error handling
c.OnError(func(r *colly.Response, err error) {
log.Printf("Error scraping %s: %v", r.Request.URL, err)
})
// Log when starting a new page
c.OnRequest(func(r *colly.Request) {
log.Printf("Visiting %s (depth: %d)", r.URL.String(), r.Ctx.GetAny("depth"))
})
// Handle all responses
c.OnResponse(func(r *colly.Response) {
handleResponse(r, bucketName, &scrapedDocs)
})
// Handle HTML elements
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
handleHTML(e, c, maxDepth)
})
return c, &scrapedDocs
}
func handleResponse(r *colly.Response, bucketName string, scrapedDocs *[]ScrapedDocument) {
contentType := r.Headers.Get("Content-Type")
log.Printf("Got response from %s (type: %s)", r.Request.URL, contentType)
// Skip non-HTML, non-PDF content
if !strings.Contains(contentType, "text/html") && !strings.Contains(contentType, "application/pdf") {
log.Printf("Skipping non-HTML/PDF content type: %s", contentType)
return
}
// Generate filename from URL
filename := sanitizeFilename(r.Request.URL.String())
if strings.Contains(contentType, "application/pdf") {
if !strings.HasSuffix(filename, ".pdf") {
filename += ".pdf"
}
} else {
if !strings.HasSuffix(filename, ".html") {
filename += ".html"
}
}
// Create GCS path
gcsPath := fmt.Sprintf("gs://%s/%s", bucketName, filename)
log.Printf("Saving content to: %s", gcsPath)
// Write content to GCS
if err := writeDataToGCS(context.Background(), bucketName, filename, r.Body); err != nil {
log.Printf("Error writing to GCS: %v", err)
return
}
// Add to scraped documents
doc := ScrapedDocument{
URL: r.Request.URL.String(),
Filename: filename,
GCSPath: gcsPath,
ContentType: contentType,
}
*scrapedDocs = append(*scrapedDocs, doc)
log.Printf("Successfully saved document: %s", gcsPath)
}
func handleHTML(e *colly.HTMLElement, c *colly.Collector, maxDepth int) {
link := e.Attr("href")
log.Printf("Found link: %s", link)
// Handle relative URLs
absoluteURL := e.Request.AbsoluteURL(link)
if absoluteURL == "" {
log.Printf("Skipping invalid URL: %s", link)
return
}
// Get the current depth from the context
currentDepth, ok := e.Request.Ctx.GetAny("depth").(int)
if !ok {
currentDepth = 0 // Default to 0 if not found
}
// Check if the maximum depth has been reached
if currentDepth >= maxDepth {
log.Printf("Max depth reached at %s (depth: %d)", absoluteURL, currentDepth)
return
}
// Visit link with incremented depth in context
log.Printf("Visiting URL: %s (depth: %d)", absoluteURL, currentDepth+1)
// Create a new context
visitCtx := colly.NewContext()
visitCtx.Put("depth", currentDepth + 1)
if err := c.Request("GET", absoluteURL, nil, visitCtx, nil); err != nil {
log.Printf("Error visiting %s: %v", absoluteURL, err)
}
}
func saveResults(ctx context.Context, firestoreClient *firestore.Client, docRef *firestore.DocumentRef, scrapedDocs *[]ScrapedDocument) {
// Marshal the entire scrapedDocs to JSON to check the total size
fullResultDataJSON, err := json.Marshal(map[string]interface{}{
"scraped_documents": scrapedDocs,
})
if err != nil {
updateJobError(ctx, docRef, fmt.Errorf("error marshaling resultData: %v", err))
log.Print(err)
return
}
// Check if the total size exceeds the Firestore limit
// Keep the limit 1000000 instead of 1048576 to account for firestore padding
if len(fullResultDataJSON) > 1000000 {
log.Printf("resultData size exceeds Firestore limit. Truncating data.")
// If it exceeds, truncate the data to fit within the limit
var truncatedDocs []ScrapedDocument
truncatedSize := 0
// Iterate through the scraped documents and add them to the truncated list
// until the size is close to but does not exceed the limit
for _, doc := range *scrapedDocs {
docJSON, err := json.Marshal(doc)
if err != nil {
updateJobError(ctx, docRef, fmt.Errorf("error marshaling individual document: %v", err))
log.Print(err)
return
}
if truncatedSize+len(docJSON) > 1000000 {
// Stop adding documents if the next one would exceed the limit
break
}
truncatedDocs = append(truncatedDocs, doc)
truncatedSize += len(docJSON)
}
// Prepare the truncated data to be written to Firestore
resultData := map[string]interface{}{
"scraped_documents": truncatedDocs,
}
// Update Firestore with the truncated data
_, err = docRef.Update(ctx, []firestore.Update{
{Path: "result_data", Value: resultData},
{Path: "status", Value: "succeeded"},
})
if err != nil {
updateJobError(ctx, docRef, fmt.Errorf("failed to update job document with truncated data: %v", err))
log.Print(err)
return
}
log.Printf("Successfully updated job with truncated scraped documents")
return
}
// If the total size is within the limit, proceed with updating the Firestore document as is
resultData := map[string]interface{}{
"scraped_documents": scrapedDocs,
}
_, err = docRef.Update(ctx, []firestore.Update{
{Path: "result_data", Value: resultData},
{Path: "status", Value: "succeeded"},
})
if err != nil {
updateJobError(ctx, docRef, fmt.Errorf("failed to update job document: %v", err))
log.Print(err)
return
}
log.Printf("Successfully updated job with %d scraped documents", len(*scrapedDocs))
}
// sanitizeFilename sanitizes the URL to create a safe filename
func sanitizeFilename(url string) string {
// Remove the scheme and domain
parts := strings.SplitN(url, "://", 2)
if len(parts) == 2 {
parts = strings.SplitN(parts[1], "/", 2)
if len(parts) == 2 {
url = parts[1]
}
}
// Replace invalid characters
safe := strings.Map(func(r rune) rune {
if (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || (r >= '0' && r <= '9') || r == '-' || r == '_' || r == '.' {
return r
}
return '_'
}, url)
// Truncate if too long
if len(safe) > 200 {
safe = safe[:200]
}
return safe
}
// extractDomain extracts the base domain from a URL
func extractDomain(url string) string {
domain := ""
parts := strings.SplitN(url, "://", 2)
if len(parts) == 2 {
// Get the domain part
domain = strings.SplitN(parts[1], "/", 2)[0]
// Handle www and other subdomains
domain = strings.TrimPrefix(domain, "www.")
}
log.Printf("Extracted domain: %s from URL: %s", domain, url)
return domain
}
// generateBucketName generates a GCS bucket name based on project ID and engine name
func generateBucketName(projectID string, qEngineName string) (string, error) {
// Replace spaces and underscores with hyphens, convert to lowercase
qeName := strings.ToLower(qEngineName)
qeName = strings.ReplaceAll(qeName, " ", "-")
qeName = strings.ReplaceAll(qeName, "_", "-")
bucketName := fmt.Sprintf("%s-downloads-%s", projectID, qeName)
// Validate bucket name matches GCS requirements
if match, _ := regexp.MatchString("^[a-z0-9][a-z0-9._-]{1,61}[a-z0-9]$", bucketName); !match {
return "", fmt.Errorf("invalid bucket name: %s", bucketName)
}
return bucketName, nil
}
// initializeBucket initializes the GCS bucket by creating it if it doesn't exist
// or clearing its contents if it does
func initializeBucket(ctx context.Context, projectID, bucketName string) error {
bucket := storageClient.Bucket(bucketName)
// Check if bucket exists
_, err := bucket.Attrs(ctx)
if err == storage.ErrBucketNotExist {
log.Printf("Creating bucket %s", bucketName)
if err := bucket.Create(ctx, projectID, nil); err != nil {
return fmt.Errorf("error creating bucket: %v", err)
}
} else if err != nil {
return fmt.Errorf("error checking bucket: %v", err)
} else {
// Bucket exists, clear all objects
log.Printf("Clearing existing objects from bucket %s", bucketName)
it := bucket.Objects(ctx, nil)
for {
attrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("error listing objects: %v", err)
}
if err := bucket.Object(attrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("error deleting object %s: %v", attrs.Name, err)
}
}
}
return nil
}
// writeDataToGCS writes data to the specified GCS bucket and filename
func writeDataToGCS(ctx context.Context, bucketName string, filename string, content []byte) error {
// Use global storage client instead of creating new one
bucket := storageClient.Bucket(bucketName)
// Write content to GCS
obj := bucket.Object(filename)
writer := obj.NewWriter(ctx)
if _, err := writer.Write(content); err != nil {
return fmt.Errorf("error writing to GCS: %v", err)
}
if err := writer.Close(); err != nil {
return fmt.Errorf("error closing GCS writer: %v", err)
}
log.Printf("Successfully wrote %s to GCS bucket %s", filename, bucketName)
return nil
}
// updateJobError updates the job document with the provided error and sets status to failed
func updateJobError(ctx context.Context, docRef *firestore.DocumentRef, err error) {
_, updateErr := docRef.Update(ctx, []firestore.Update{
{Path: "errors", Value: []string{err.Error()}},
{Path: "status", Value: "failed"},
})
if updateErr != nil {
log.Printf("Failed to update job error status: %v", updateErr)
}
}