func main()

in rocketmq-knative/source/cmd/receive_adapter/main.go [48:84]


func main() {
	flag.Parse()
	ctx := context.Background()
	logCfg := zap.NewProductionConfig()
	logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
	logger, err := logCfg.Build()
	if err != nil {
		log.Fatalf("Unable to create logger: %v", err)
	}

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		logger.Fatal("Failed to process env var", zap.Error(err))
	}
	client, err := newKubernetesClient()
	if err != nil {
		logger.Fatal("Failed to initialize kubernetes client: ", zap.Error(err))
	}

	adapter := &rocketmq.Adapter{
		Namespace:      env.Namespace,
		SecretName:     env.SecretName,
		SecretKey:      env.SecretKey,
		K8sClient:      client,
		Topic:      env.Topic,
		NamesrvAddr:       env.NamesrvAddr,
		RNamespace: env.RNamespace,
		GroupName: env.GroupName,
		SinkURI:        env.Sink,
		SubscriptionID: env.Subscription,
	}

	logger.Info("Starting RocketMQ Receive Adapter. %v", zap.Reflect("adapter: ", adapter))
	if err := adapter.Start(ctx); err != nil {
		logger.Fatal("failed to start adapter: ", zap.Error(err))
	}
}