func()

in rocketmq-knative/source/pkg/adapter/adapter.go [53:89]


func (a *Adapter) Start(ctx context.Context) error {

	var err error
	if a.ceClient == nil {
		if a.ceClient, err = kncloudevents.NewDefaultClient(a.SinkURI); err != nil {
			return fmt.Errorf("failed to create cloudevent client: %s", err.Error())
		}
	}
	if(a.SecretName != "") {
		secret, err := a.K8sClient.CoreV1().Secrets(a.Namespace).Get(a.SecretName, v1.GetOptions{})
		if err != nil {
			fmt.Errorf("Failed to get secret %s", err.Error())
			return err
		}
		cred := &v1alpha1.Credentials{}
		err = json.Unmarshal(secret.Data[a.SecretKey], cred)
		if err != nil {
			fmt.Errorf("Failed to get secret %s", err.Error())
			return err
		}
		a.pushConsumer, _ = rocketmq.NewPushConsumer(
			consumer.WithGroupName(a.GroupName),
			consumer.WithNameServer([] string {cred.Url}),
			consumer.WithCredentials(primitive.Credentials{
				AccessKey: cred.AccessKeyId,
				SecretKey: cred.AccessKeySecret,
			}),
			consumer.WithNamespace(a.RNamespace),
		)
	}else {
		a.pushConsumer, _ = rocketmq.NewPushConsumer(
			consumer.WithGroupName(a.GroupName),
			consumer.WithNameServer([] string {a.NamesrvAddr}),
		)
	}
	return a.consumerStart()
}