func()

in pkg/controller/topictransfer/topictransfer_controller.go [109:289]


func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
	reqLogger.Info("Reconciling TopicTransfer")

	// Fetch the TopicTransfer topicTransfer
	topicTransfer := &rocketmqv1alpha1.TopicTransfer{}
	err := r.client.Get(context.TODO(), request.NamespacedName, topicTransfer)
	if err != nil {
		if errors.IsNotFound(err) {
			// Request object not found, could have been deleted after reconcile request.
			// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
			// Return and don't requeue
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	topic := topicTransfer.Spec.Topic
	targetCluster := topicTransfer.Spec.TargetCluster
	sourceCluster := topicTransfer.Spec.SourceCluster

	nameServer := strings.Split(share.NameServersStr, ";")[0]
	if len(nameServer) < cons.MinIpListLength {
		reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.")
		// terminate the transfer process
		return reconcile.Result{}, nil
	}

	// ConsumerGroup could be decided by listing the topics
	consumerGroups := getConsumerGroupByTopic(topic, nameServer)
	if consumerGroups == nil {
		reqLogger.Info("There is no consumer group of topic " + topic)
	}

	if undo {
		// undo the operations for atomicity
		reqLogger.Info("Transfer topic " + topic + "  from " + sourceCluster + " to " + targetCluster + " failed, rolling back...")
		switch status {
		case 7:
			fallthrough
		case 6:
			undoDeleteConsumeGroup(consumerGroups, sourceCluster, nameServer)
			fallthrough
		case 5:
			undoDeleteTopic(topic, sourceCluster, nameServer)
			fallthrough
		case 4:
			fallthrough
		case 3:
			undoStopWrite(topic, sourceCluster, nameServer)
		default:
			// for user data safety, no special operations for other status
		}
	} else {
		// step1: add all consumer groups to target cluster
		status = 1
		for i, consumerGroup := range consumerGroups {
			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
			addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
			reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
			cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
			output, err := cmd.Output()
			// validate command output
			if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
				reqLogger.Error(err, "Failed to add ConsumerGroup "+consumerGroup+" to TargetCluster "+targetCluster+" with output: "+string(output))
				// terminate the transfer process
				undo = true
				return reconcile.Result{Requeue: true}, err
			}
			reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
		}

		// step2: add consumer group to target cluster
		status = 2
		addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
		reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
		cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
		output, err := cmd.Output()
		// validate command output
		if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
			reqLogger.Error(err, "Failed to add Topic "+topic+" to TargetCluster "+targetCluster+" with output: "+string(output))
			// terminate the transfer process
			undo = true
			return reconcile.Result{Requeue: true}, err
		}
		reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))

		// step3: stop write in source cluster topic
		status = 3
		stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
		reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
		cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
		output, err = cmd.Output()
		// validate command output
		if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
			reqLogger.Error(err, "Failed to stop Topic "+topic+" write in SourceCluster "+sourceCluster+" with output: "+string(output))
			// terminate the transfer process
			undo = true
			return reconcile.Result{Requeue: true}, err
		}
		reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))

		// step4: check source cluster unconsumed message
		status = 4
		for i, consumerGroup := range consumerGroups {
			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
			for {
				checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
				reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
				cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
				output, err = cmd.Output()
				if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
					reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
					// terminate the transfer process
					undo = true
					return reconcile.Result{Requeue: true}, err
				}
				reqLogger.Info(" output: " + string(output))
				if isConsumeFinished(string(output), topic, sourceCluster) {
					reqLogger.Info("Message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finished!")
					break
				}
				reqLogger.Info("Wait a moment for message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finish ...")
				time.Sleep(time.Duration(cons.CheckConsumeFinishIntervalInSecond) * time.Second)
			}
		}

		// step5: delete topic in source cluster
		status = 5
		deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
		reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
		cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
		output, err = cmd.Output()
		if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
			reqLogger.Error(err, "Failed to delete Topic "+topic+" in SourceCluster "+sourceCluster+" with output: "+string(output))
			// terminate the transfer process
			undo = true
			return reconcile.Result{Requeue: true}, err
		}
		reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))

		// step6: delete consumer group in source cluster
		status = 6
		for i, consumerGroup := range consumerGroups {
			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
			deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
			reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
			cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
			output, err = cmd.Output()
			if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
				reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
				// terminate the transfer process
				undo = true
				return reconcile.Result{Requeue: true}, err
			}
			reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
		}

		// step7: create retry topic
		status = 7
		for i, consumerGroup := range consumerGroups {
			log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
			createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
			reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
			cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
			output, err = cmd.Output()
			if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
				reqLogger.Error(err, "Failed to create retry topic of consumer group "+consumerGroup+" in TargetCluster "+targetCluster+" with output: "+string(output))
				// terminate the transfer process
				undo = true
				return reconcile.Result{Requeue: true}, err
			}
			reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
		}

		reqLogger.Info("Topic " + topic + " has been successfully transferred from " + sourceCluster + " to " + targetCluster)
	}

	return reconcile.Result{}, nil
}