in rocketmq-knative/source/pkg/adapter/adapter.go [53:89]
func (a *Adapter) Start(ctx context.Context) error {
var err error
if a.ceClient == nil {
if a.ceClient, err = kncloudevents.NewDefaultClient(a.SinkURI); err != nil {
return fmt.Errorf("failed to create cloudevent client: %s", err.Error())
}
}
if(a.SecretName != "") {
secret, err := a.K8sClient.CoreV1().Secrets(a.Namespace).Get(a.SecretName, v1.GetOptions{})
if err != nil {
fmt.Errorf("Failed to get secret %s", err.Error())
return err
}
cred := &v1alpha1.Credentials{}
err = json.Unmarshal(secret.Data[a.SecretKey], cred)
if err != nil {
fmt.Errorf("Failed to get secret %s", err.Error())
return err
}
a.pushConsumer, _ = rocketmq.NewPushConsumer(
consumer.WithGroupName(a.GroupName),
consumer.WithNameServer([] string {cred.Url}),
consumer.WithCredentials(primitive.Credentials{
AccessKey: cred.AccessKeyId,
SecretKey: cred.AccessKeySecret,
}),
consumer.WithNamespace(a.RNamespace),
)
}else {
a.pushConsumer, _ = rocketmq.NewPushConsumer(
consumer.WithGroupName(a.GroupName),
consumer.WithNameServer([] string {a.NamesrvAddr}),
)
}
return a.consumerStart()
}