forwarder/main.go (125 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/bigtable"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
securityreport "github.com/GoogleCloudPlatform/reporting-api-processor/forwarder/proto"
)
const (
btWriteBufSize = 1000
interval = 10 * time.Second
)
var (
bt *bigtable.Client
reportCh chan *securityreport.SecurityReport
)
func init() {
reportCh = make(chan *securityreport.SecurityReport, btWriteBufSize*10)
}
func main() {
project := os.Getenv("BT_PROJECT")
instance := os.Getenv("BT_INSTANCE")
if project == "" || instance == "" {
log.Fatalf("either of environment variables BT_PROJECT or BT_INSTANCE is empty.")
}
ctx := context.Background()
var err error
bt, err = bigtable.NewClient(ctx, project, instance)
defer func() {
if err := bt.Close(); err != nil {
log.Fatalf("failed to close Cloud BigTable client: %v", err)
}
}()
if err != nil {
log.Fatalf("failed to create BigTable client: %v", err)
}
go reportWriter(ctx, reportCh)
e := echo.New()
e.Debug = true
e.HideBanner = true
e.HidePort = true
// In order to handle Reporting API, the reporting endpoint needs to handle CORS
e.Use(middleware.CORSWithConfig(middleware.DefaultCORSConfig))
e.POST("/main", mainHandler)
e.POST("/default", mainHandler)
e.GET("/_healthz", healthzHandler)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
if err := e.Start(":" + port); err != nil {
log.Fatalf("failure occurred on launching HTTP server: %v", err)
}
}
func healthzHandler(c echo.Context) error {
return c.String(http.StatusOK, "OK")
}
func mainHandler(c echo.Context) error {
r := c.Request()
contentType := r.Header.Get("Content-Type")
if contentType != "application/reports+json" {
c.Echo().Logger.Errorf("Content-Type header is not application/reports+json: %v", r.Header)
return c.String(http.StatusBadRequest, "Content-Type not supported. The Content-Type must be application/reports+json.")
}
data, err := io.ReadAll(r.Body)
if err != nil {
c.Echo().Logger.Errorf("error on reading data: %v", err)
}
c.Echo().Logger.Debug(string(data))
if err := r.Body.Close(); err != nil {
return err
}
var buf []map[string]interface{}
err = json.Unmarshal(data, &buf)
if err != nil {
c.Echo().Logger.Errorf("error on parsing JSON: %v", err)
return c.String(http.StatusInternalServerError, err.Error())
}
c.Echo().Logger.Debugf("queueing %v reports", len(buf))
for _, b := range buf {
// TODO(yoshifumi): Extrace values with keys and store them into SecurityReport
reportCh <- mapToSecurityReport(c.Logger(), b)
}
return c.String(http.StatusNoContent, "OK")
}
func reportWriter(ctx context.Context, ch chan *securityreport.SecurityReport) {
t := time.NewTicker(interval)
table := bt.Open(tableName)
defer t.Stop()
for {
select {
case <-t.C:
size := min(len(ch), btWriteBufSize)
// log.Printf("buffered %v reports", size)
if size == 0 {
// log.Printf("no reports buffered. skipping.")
continue
}
muts := make([]*bigtable.Mutation, size)
keys := make([]string, size)
for i := 0; i < size; i++ {
muts[i] = bigtable.NewMutation()
r := <-ch
setSecurityReport(muts[i], r)
keys[i] = generateRowKey(r)
}
rowErrs, err := table.ApplyBulk(ctx, keys, muts)
if err != nil {
log.Printf("could not apply bulk row mutation: %v", err)
}
for _, rowErr := range rowErrs {
log.Printf("error writing row: %v", rowErr)
}
log.Printf("wrote %v reports", size)
}
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}