in image/resources/knfsd-fsidd/main.go [103:257]
func run(ctx context.Context, cfg *Config) error {
var err error
m := metrics.Start(ctx, cfg.Metrics)
defer func() {
deadline, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := m.Shutdown(deadline)
if err != nil {
log.Warn.Printf("metrics did not shutdown gracefully: %s", err)
}
}()
db, err := connect(ctx, cfg.Database)
if err != nil {
return err
}
defer db.Close()
source := FSIDSource{
db: db,
tableName: cfg.Database.TableName,
}
if cfg.Database.CreateTable {
err = source.CreateTable(ctx)
if err != nil {
return err
}
}
var f FSIDProvider
if cfg.Cache {
f = &FSIDCache{source: source}
} else {
f = source
}
s, err := resolveSocket(cfg.SocketPath)
if err != nil {
return err
}
defer s.Close()
s.Handle("get_fsidnum", func(ctx context.Context, path string) (string, error) {
rec := metrics.StartRequest("get_fsidnum")
if path == "" {
rec.End(ctx, "error")
return "", ErrInvalidArgument
}
var fsid int32
err := withRetry(ctx, func() error {
var err error
rec := rec.StartOperation()
fsid, err = f.GetFSID(ctx, path)
rec.End(ctx, SQLMetricResult(err))
return err
})
rec.End(ctx, SQLMetricResult(err))
if err == nil {
return strconv.FormatInt(int64(fsid), 10), nil
} else if IsNotFound(err) {
return "", nil
} else {
return "", err
}
})
s.Handle("get_or_create_fsidnum", func(ctx context.Context, path string) (string, error) {
rec := metrics.StartRequest("get_or_create_fsidnum")
if path == "" {
rec.End(ctx, "error")
return "", ErrInvalidArgument
}
var fsid int32
err = withRetry(ctx, func() error {
var err error
rec := rec.StartOperation()
fsid, err = f.GetFSID(ctx, path)
if errors.Is(err, pgx.ErrNoRows) {
// FSID not found for path, so try and allocate one.
// This might fail with a 23505 unique_violation if the path has
// already been allocated an FSID by different process. withRetry
// will then retry this whole block and will find the FSID
// allocated by the other process.
fsid, err = f.AllocateFSID(ctx, path)
}
rec.End(ctx, SQLMetricResult(err))
return err
})
rec.End(ctx, SQLMetricResult(err))
return strconv.FormatInt(int64(fsid), 10), err
})
s.Handle("get_path", func(ctx context.Context, arg string) (string, error) {
rec := metrics.StartRequest("get_path")
fsid, err := strconv.ParseInt(arg, 10, 32)
if err != nil {
rec.End(ctx, "error")
return "", ErrInvalidArgument
}
if fsid < 1 {
rec.End(ctx, "error")
return "", ErrInvalidArgument
}
var path string
err = withRetry(ctx, func() error {
var err error
rec := rec.StartOperation()
path, err = f.GetPath(ctx, int32(fsid))
rec.End(ctx, SQLMetricResult(err))
return err
})
rec.End(ctx, SQLMetricResult(err))
return path, err
})
s.Handle("version", func(ctx context.Context, arg string) (string, error) {
metrics.Request(ctx, "version", "ok", 0, 0)
return "1", nil
})
go func() {
<-ctx.Done()
_, err := daemon.SdNotify(false, daemon.SdNotifyStopping)
if err != nil {
log.Error.Print(err)
}
deadline, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
s.Shutdown(deadline)
s.Close()
}()
_, err = daemon.SdNotify(false, daemon.SdNotifyReady)
if err != nil {
return err
}
log.Info.Print("service ready")
err = s.Serve()
if errors.Is(err, ErrServerClosed) {
err = nil
}
return err
}