in tools/gcs2bq/main.go [201:316]
func main() {
os.Stderr.WriteString(fmt.Sprintf("Google Cloud Storage object metadata to BigQuery, version %s\n", VERSION))
outputFile = flag.String("file", "gcs.avro", "output file name")
includeVersions = flag.Bool("versions", false, "include GCS object versions")
GOMAXPROCS = flag.Int("concurrency", 4, "concurrency (GOMAXPROCS)")
BUFFER_SIZE = flag.Int("buffer_size", 1000, "file buffer")
AVRO_SCHEMA = flag.String("avro_schema", "embedded", "Avro schema (default: use embedded)")
flag.Set("logtostderr", "true")
flag.Parse()
glog.Infof("Performance settings: GOMAXPROCS=%d, buffer size=%d", *GOMAXPROCS, *BUFFER_SIZE)
runtime.GOMAXPROCS(*GOMAXPROCS)
ctx := context.Background()
crmService, err := cloudresourcemanager.NewService(ctx, option.WithUserAgent(USER_AGENT))
if err != nil {
panic(err)
}
glog.Warning("Retrieving a list of all projects...")
projectsService := crmService.Projects
projectsList := make([]cloudresourcemanager.Project, 0)
pageToken := ""
for {
response, err := projectsService.List().Filter("lifecycleState:ACTIVE").PageToken(pageToken).Do()
if err != nil {
panic(err)
}
for _, project := range response.Projects {
glog.Infof("Found project: %s (%d)", project.ProjectId, project.ProjectNumber)
projectsList = append(projectsList, *project)
}
pageToken = response.NextPageToken
if pageToken == "" {
break
}
}
var wg sync.WaitGroup
objectCh := make(chan GcsFile, *BUFFER_SIZE)
wg.Add(1)
go func() {
defer wg.Done()
var wgProject sync.WaitGroup
wgProject.Add(len(projectsList))
for _, project := range projectsList {
go processProject(&wgProject, &ctx, objectCh, project)
}
wgProject.Wait()
close(objectCh)
}()
var schema avro.Schema
if (*AVRO_SCHEMA) != "embedded" {
_, err = os.Stat(*AVRO_SCHEMA)
if os.IsNotExist(err) {
glog.Fatalf("Could not read Avro schema file: %s", *AVRO_SCHEMA)
os.Exit(1)
}
glog.Infof("Using custom Avro schema: %s", *AVRO_SCHEMA)
schema, err = avro.ParseSchemaFile(*AVRO_SCHEMA)
if err != nil {
panic(err)
}
} else {
glog.Infof("Using embedded Avro schema")
schemaData, _ := f.ReadFile("gcs2bq.avsc")
schema, err = avro.ParseSchema(string(schemaData))
if err != nil {
panic(err)
}
}
writer := avro.NewSpecificDatumWriter()
writer.SetSchema(schema)
f, err := os.Create(*outputFile)
if err != nil {
panic(err)
}
defer f.Close()
w := bufio.NewWriter(f)
dfw, err := avro.NewDataFileWriter(w, schema, writer)
if err != nil {
panic(err)
}
wg.Add(1)
go func() {
defer wg.Done()
for i := range objectCh {
avroObject, err := objectToAvro(i.ProjectId, i.Object)
if err == nil {
err = dfw.Write(avroObject)
if err != nil {
panic(err)
}
dfw.Flush()
}
}
}()
wg.Wait()
w.Flush()
dfw.Close()
f.Sync()
glog.Warningf("Processing complete, output in: %s", *outputFile)
os.Exit(EXIT_STATUS)
}