in rocketmq-knative/source/cmd/receive_adapter/main.go [48:84]
func main() {
flag.Parse()
ctx := context.Background()
logCfg := zap.NewProductionConfig()
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, err := logCfg.Build()
if err != nil {
log.Fatalf("Unable to create logger: %v", err)
}
var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}
client, err := newKubernetesClient()
if err != nil {
logger.Fatal("Failed to initialize kubernetes client: ", zap.Error(err))
}
adapter := &rocketmq.Adapter{
Namespace: env.Namespace,
SecretName: env.SecretName,
SecretKey: env.SecretKey,
K8sClient: client,
Topic: env.Topic,
NamesrvAddr: env.NamesrvAddr,
RNamespace: env.RNamespace,
GroupName: env.GroupName,
SinkURI: env.Sink,
SubscriptionID: env.Subscription,
}
logger.Info("Starting RocketMQ Receive Adapter. %v", zap.Reflect("adapter: ", adapter))
if err := adapter.Start(ctx); err != nil {
logger.Fatal("failed to start adapter: ", zap.Error(err))
}
}