go/greeter/greeter.go (89 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"github.com/apache/flink-statefun/statefun-sdk-go/v3/pkg/statefun"
"net/http"
)
type GreetRequest struct {
Name string `json:"name"`
Visits int32 `json:"visits"`
}
type EgressRecord struct {
Topic string `json:"topic"`
Payload string `json:"payload"`
}
var (
PersonTypeName = statefun.TypeNameFrom("example/person")
GreeterTypeName = statefun.TypeNameFrom("example/greeter")
PlaygroundEgressTypeName = statefun.TypeNameFrom("io.statefun.playground/egress")
GreetRequestType = statefun.MakeJsonType(statefun.TypeNameFrom("example/GreetRequest"))
EgressRecordType = statefun.MakeJsonType(statefun.TypeNameFrom("io.statefun.playground/EgressRecord"))
)
type Person struct {
Visits statefun.ValueSpec
}
func (p *Person) Invoke(ctx statefun.Context, message statefun.Message) error {
// update the visit count.
var visits int32
ctx.Storage().Get(p.Visits, &visits)
visits += 1
fmt.Printf("seen %d", visits)
ctx.Storage().Set(p.Visits, visits)
// enrich the request with the number of visits.
var request GreetRequest
if err := message.As(GreetRequestType, &request); err != nil {
return fmt.Errorf("failed to deserialize greet reqeuest: %w", err)
}
request.Visits = visits
// next, we will forward a message to a special greeter function,
// that will compute a personalized greeting based on the number
// of visits that this person has been seen.
ctx.Send(statefun.MessageBuilder{
Target: statefun.Address{
FunctionType: GreeterTypeName,
Id: request.Name,
},
Value: request,
ValueType: GreetRequestType,
})
return nil
}
func Greeter(ctx statefun.Context, message statefun.Message) error {
var request GreetRequest
if err := message.As(GreetRequestType, &request); err != nil {
return fmt.Errorf("failed to deserialize greet reqeuest: %w", err)
}
greeting := computeGreeting(request.Name, request.Visits)
egressRecord := EgressRecord {
Topic: "greetings",
Payload: greeting,
}
ctx.SendEgress(statefun.GenericEgressBuilder{
Target: PlaygroundEgressTypeName,
Value: egressRecord,
ValueType: EgressRecordType,
})
return nil
}
func computeGreeting(name string, visits int32) string {
templates := []string{"", "Welcome %s", "Nice to see you again %s", "Third time is the charm %s"}
if visits < int32(len(templates)) {
return fmt.Sprintf(templates[visits], name)
}
return fmt.Sprintf("Nice to see you for the %d-th time %s!", visits, name)
}
func main() {
builder := statefun.StatefulFunctionsBuilder()
person := &Person{
Visits: statefun.ValueSpec{
Name: "visits",
ValueType: statefun.Int32Type,
},
}
_ = builder.WithSpec(statefun.StatefulFunctionSpec{
FunctionType: PersonTypeName,
States: []statefun.ValueSpec{person.Visits},
Function: person,
})
_ = builder.WithSpec(statefun.StatefulFunctionSpec{
FunctionType: GreeterTypeName,
Function: statefun.StatefulFunctionPointer(Greeter),
})
http.Handle("/statefun", builder.AsHandler())
_ = http.ListenAndServe(":8000", nil)
}