func main()

in tools/gcs2bq/main.go [201:316]


func main() {
	os.Stderr.WriteString(fmt.Sprintf("Google Cloud Storage object metadata to BigQuery, version %s\n", VERSION))

	outputFile = flag.String("file", "gcs.avro", "output file name")
	includeVersions = flag.Bool("versions", false, "include GCS object versions")
	GOMAXPROCS = flag.Int("concurrency", 4, "concurrency (GOMAXPROCS)")
	BUFFER_SIZE = flag.Int("buffer_size", 1000, "file buffer")
	AVRO_SCHEMA = flag.String("avro_schema", "embedded", "Avro schema (default: use embedded)")
	flag.Set("logtostderr", "true")
	flag.Parse()

	glog.Infof("Performance settings: GOMAXPROCS=%d, buffer size=%d", *GOMAXPROCS, *BUFFER_SIZE)
	runtime.GOMAXPROCS(*GOMAXPROCS)
	ctx := context.Background()

	crmService, err := cloudresourcemanager.NewService(ctx, option.WithUserAgent(USER_AGENT))
	if err != nil {
		panic(err)
	}

	glog.Warning("Retrieving a list of all projects...")
	projectsService := crmService.Projects
	projectsList := make([]cloudresourcemanager.Project, 0)
	pageToken := ""
	for {
		response, err := projectsService.List().Filter("lifecycleState:ACTIVE").PageToken(pageToken).Do()
		if err != nil {
			panic(err)
		}

		for _, project := range response.Projects {
			glog.Infof("Found project: %s (%d)", project.ProjectId, project.ProjectNumber)
			projectsList = append(projectsList, *project)
		}

		pageToken = response.NextPageToken
		if pageToken == "" {
			break
		}
	}

	var wg sync.WaitGroup

	objectCh := make(chan GcsFile, *BUFFER_SIZE)
	wg.Add(1)
	go func() {
		defer wg.Done()

		var wgProject sync.WaitGroup
		wgProject.Add(len(projectsList))
		for _, project := range projectsList {
			go processProject(&wgProject, &ctx, objectCh, project)
		}
		wgProject.Wait()
		close(objectCh)
	}()

	var schema avro.Schema
	if (*AVRO_SCHEMA) != "embedded" {
		_, err = os.Stat(*AVRO_SCHEMA)
		if os.IsNotExist(err) {
			glog.Fatalf("Could not read Avro schema file: %s", *AVRO_SCHEMA)
			os.Exit(1)
		}

		glog.Infof("Using custom Avro schema: %s", *AVRO_SCHEMA)
		schema, err = avro.ParseSchemaFile(*AVRO_SCHEMA)
		if err != nil {
			panic(err)
		}
	} else {
		glog.Infof("Using embedded Avro schema")
		schemaData, _ := f.ReadFile("gcs2bq.avsc")
		schema, err = avro.ParseSchema(string(schemaData))
		if err != nil {
			panic(err)
		}
	}

	writer := avro.NewSpecificDatumWriter()
	writer.SetSchema(schema)

	f, err := os.Create(*outputFile)
	if err != nil {
		panic(err)
	}
	defer f.Close()

	w := bufio.NewWriter(f)
	dfw, err := avro.NewDataFileWriter(w, schema, writer)
	if err != nil {
		panic(err)
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := range objectCh {
			avroObject, err := objectToAvro(i.ProjectId, i.Object)
			if err == nil {
				err = dfw.Write(avroObject)
				if err != nil {
					panic(err)
				}
				dfw.Flush()
			}
		}
	}()
	wg.Wait()

	w.Flush()
	dfw.Close()
	f.Sync()
	glog.Warningf("Processing complete, output in: %s", *outputFile)
	os.Exit(EXIT_STATUS)
}