functions/events-all-firebase/all_firebase.go (105 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package all_firebase
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"time"
"cloud.google.com/go/firestore"
firebase "firebase.google.com/go/v4"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
"github.com/cloudevents/sdk-go/v2/event"
"google.golang.org/api/iterator"
)
func init() {
// Register a CloudEvent function with the Functions Framework
functions.CloudEvent("allFirebaseFunction", allFirebaseFunction)
}
// Function myCloudEventFunction accepts and handles a CloudEvent object
func allFirebaseFunction(ctx context.Context, e event.Event) error {
// Parse the JSON into a generic map
var event map[string]interface{}
if err := json.Unmarshal([]byte(e.Data()), &event); err != nil {
return fmt.Errorf("error parsing JSON: %v", err)
}
// Expect events to have a base64 encoded 'data' field. This doesn't
// make for great messages. Decode and replace before using.
// Stat by checking for 'message'
message, ok := event["message"].(map[string]interface{})
if !ok {
return fmt.Errorf("message field not found")
}
// Extract and decode the 'data' field from 'message'
if dataEncoded, ok := message["data"].(string); ok {
dataDecoded, err := base64.StdEncoding.DecodeString(dataEncoded)
if err != nil {
return fmt.Errorf("error decoding base64: %v", err)
}
// Parse the JSON into a generic map, so we get a complete
// JSON doc after we add it back to the message a re-serialize
var data map[string]interface{}
if err := json.Unmarshal([]byte(dataDecoded), &data); err != nil {
return fmt.Errorf("error parsing JSON from 'data' field: %v", err)
}
message["data"] = data
} else {
return fmt.Errorf("data field not found within message or not a string")
}
// Move all attributes to the message itself (for ease of use in the web app)
var attributes = message["attributes"].(map[string]interface{})
for key, value := range attributes {
// log.Printf("attribute: %v : %v\n", key, value)
message[key] = value
}
delete(message, "attributes")
// Time to publish to Firebase
app, err := firebase.NewApp(context.Background(), nil)
if err != nil {
log.Fatalf("error initializing app: %v\n", err)
}
client, err := app.Firestore(ctx)
if err != nil {
log.Fatalln(err)
}
defer client.Close()
// Check if event is starting a game and if so, clear out the previous events
if message["PinballEventType"] == "GameStarted" {
log.Println("GameStarted")
var buff bytes.Buffer
deleteCollection(&buff, ctx, *client, "LiveGameEvents", 40)
log.Print(buff)
}
currentTimeUTC := time.Now().UTC()
timestampUnix := currentTimeUTC.UnixMilli()
// Add the utcTimestamp field to the message data
message["utcTimestamp"] = timestampUnix
publishTime, ok := message["publishTime"].(string)
if !ok {
return fmt.Errorf("messageId field not found")
}
_, err = client.Collection("LiveGameEvents").Doc(publishTime).Set(ctx, message)
if err != nil {
// If that doesn't work, not much we can do. Log and exit.
log.Fatalf("An error has occurred: %s", err)
}
// Write to the AllGameEvents collection
_, err = client.Collection("AllGameEvents").Doc(publishTime).Set(ctx, message)
if err != nil {
log.Fatalf("Failed to write to AllGameEvents: %v", err)
}
return nil
}
// from: https://firebase.google.com/docs/firestore/manage-data/delete-data
// "The snippets below are somewhat simplified and do not deal with error
// handling, security, deleting subcollections, or maximizing performance"
func deleteCollection(w io.Writer, ctx context.Context, client firestore.Client, collectionName string,
batchSize int) error {
col := client.Collection(collectionName)
bulkwriter := client.BulkWriter(ctx)
for {
// Get a batch of documents
iter := col.Limit(batchSize).Documents(ctx)
numDeleted := 0
// Iterate through the documents, adding
// a delete operation for each one to the BulkWriter.
for {
doc, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
bulkwriter.Delete(doc.Ref)
numDeleted++
}
// If there are no documents to delete,
// the process is over.
if numDeleted == 0 {
bulkwriter.End()
break
}
bulkwriter.Flush()
}
fmt.Fprintf(w, "Deleted collection \"%s\"", collectionName)
return nil
}