go/greeter/greeter.go (89 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "fmt" "github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun" "net/http" ) type GreetRequest struct { Name string `json:"name"` Visits int32 `json:"visits"` } type EgressRecord struct { Topic string `json:"topic"` Payload string `json:"payload"` } var ( PersonTypeName = statefun.TypeNameFrom("example/person") GreeterTypeName = statefun.TypeNameFrom("example/greeter") PlaygroundEgressTypeName = statefun.TypeNameFrom("io.statefun.playground/egress") GreetRequestType = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest")) EgressRecordType = statefun.MakeJsonType(statefun.TypeNameFrom("io.statefun.playground/EgressRecord")) ) type Person struct { Visits statefun.ValueSpec } func (p *Person) Invoke(ctx statefun.Context, message statefun.Message) error { // update the visit count. var visits int32 ctx.Storage().Get(p.Visits, &visits) visits += 1 fmt.Printf("seen %d", visits) ctx.Storage().Set(p.Visits, visits) // enrich the request with the number of visits. var request GreetRequest if err := message.As(GreetRequestType, &request); err != nil { return fmt.Errorf("failed to deserialize greet reqeuest: %w", err) } request.Visits = visits // next, we will forward a message to a special greeter function, // that will compute a personalized greeting based on the number // of visits that this person has been seen. ctx.Send(statefun.MessageBuilder{ Target: statefun.Address{ FunctionType: GreeterTypeName, Id: request.Name, }, Value: request, ValueType: GreetRequestType, }) return nil } func Greeter(ctx statefun.Context, message statefun.Message) error { var request GreetRequest if err := message.As(GreetRequestType, &request); err != nil { return fmt.Errorf("failed to deserialize greet reqeuest: %w", err) } greeting := computeGreeting(request.Name, request.Visits) egressRecord := EgressRecord { Topic: "greetings", Payload: greeting, } ctx.SendEgress(statefun.GenericEgressBuilder{ Target: PlaygroundEgressTypeName, Value: egressRecord, ValueType: EgressRecordType, }) return nil } func computeGreeting(name string, visits int32) string { templates := []string{"", "Welcome %s", "Nice to see you again %s", "Third time is the charm %s"} if visits < int32(len(templates)) { return fmt.Sprintf(templates[visits], name) } return fmt.Sprintf("Nice to see you for the %d-th time %s!", visits, name) } func main() { builder := statefun.StatefulFunctionsBuilder() person := &Person{ Visits: statefun.ValueSpec{ Name: "visits", ValueType: statefun.Int32Type, }, } _ = builder.WithSpec(statefun.StatefulFunctionSpec{ FunctionType: PersonTypeName, States: []statefun.ValueSpec{person.Visits}, Function: person, }) _ = builder.WithSpec(statefun.StatefulFunctionSpec{ FunctionType: GreeterTypeName, Function: statefun.StatefulFunctionPointer(Greeter), }) http.Handle("/statefun", builder.AsHandler()) _ = http.ListenAndServe(":8000", nil) }