in pkg/controller/topictransfer/topictransfer_controller.go [109:289]
func (r *ReconcileTopicTransfer) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling TopicTransfer")
// Fetch the TopicTransfer topicTransfer
topicTransfer := &rocketmqv1alpha1.TopicTransfer{}
err := r.client.Get(context.TODO(), request.NamespacedName, topicTransfer)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
return reconcile.Result{}, nil
}
// Error reading the object - requeue the request.
return reconcile.Result{}, err
}
topic := topicTransfer.Spec.Topic
targetCluster := topicTransfer.Spec.TargetCluster
sourceCluster := topicTransfer.Spec.SourceCluster
nameServer := strings.Split(share.NameServersStr, ";")[0]
if len(nameServer) < cons.MinIpListLength {
reqLogger.Info("There is no available name server now thus the topic transfer process is terminated.")
// terminate the transfer process
return reconcile.Result{}, nil
}
// ConsumerGroup could be decided by listing the topics
consumerGroups := getConsumerGroupByTopic(topic, nameServer)
if consumerGroups == nil {
reqLogger.Info("There is no consumer group of topic " + topic)
}
if undo {
// undo the operations for atomicity
reqLogger.Info("Transfer topic " + topic + " from " + sourceCluster + " to " + targetCluster + " failed, rolling back...")
switch status {
case 7:
fallthrough
case 6:
undoDeleteConsumeGroup(consumerGroups, sourceCluster, nameServer)
fallthrough
case 5:
undoDeleteTopic(topic, sourceCluster, nameServer)
fallthrough
case 4:
fallthrough
case 3:
undoStopWrite(topic, sourceCluster, nameServer)
default:
// for user data safety, no special operations for other status
}
} else {
// step1: add all consumer groups to target cluster
status = 1
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
addConsumerGroupToTargetClusterCommand := buildAddConsumerGroupToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("AddConsumerGroupToTargetClusterCommand: " + addConsumerGroupToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addConsumerGroupToTargetClusterCommand)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateConsumerGroupSuccess(string(output)) {
reqLogger.Error(err, "Failed to add ConsumerGroup "+consumerGroup+" to TargetCluster "+targetCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully add ConsumerGroup " + consumerGroup + " to TargetCluster " + targetCluster + " with output: " + string(output))
}
// step2: add consumer group to target cluster
status = 2
addTopicToTargetClusterCommand := buildAddTopicToClusterCommand(topic, targetCluster, nameServer)
reqLogger.Info("addTopicToTargetClusterCommand: " + addTopicToTargetClusterCommand)
cmd := exec.Command(cons.BasicCommand, cons.AdminToolDir, addTopicToTargetClusterCommand)
output, err := cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to add Topic "+topic+" to TargetCluster "+targetCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully add Topic " + topic + " to TargetCluster " + targetCluster + " with output: " + string(output))
// step3: stop write in source cluster topic
status = 3
stopSourceClusterTopicWriteCommand := buildStopClusterTopicWriteCommand(topic, sourceCluster, nameServer)
reqLogger.Info("stopSourceClusterTopicWriteCommand: " + stopSourceClusterTopicWriteCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, stopSourceClusterTopicWriteCommand)
output, err = cmd.Output()
// validate command output
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to stop Topic "+topic+" write in SourceCluster "+sourceCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully stop Topic " + topic + " write in SourceCluster " + sourceCluster + " with output: " + string(output))
// step4: check source cluster unconsumed message
status = 4
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
for {
checkConsumeProgressCommand := buildCheckConsumeProgressCommand(consumerGroup, nameServer)
reqLogger.Info("checkConsumeProgressCommand: " + checkConsumeProgressCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, checkConsumeProgressCommand)
output, err = cmd.Output()
if err != nil || !isCheckConsumeProcessCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to check consumerGroup "+consumerGroup+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info(" output: " + string(output))
if isConsumeFinished(string(output), topic, sourceCluster) {
reqLogger.Info("Message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finished!")
break
}
reqLogger.Info("Wait a moment for message consumption of " + consumerGroup + " in source cluster " + sourceCluster + " finish ...")
time.Sleep(time.Duration(cons.CheckConsumeFinishIntervalInSecond) * time.Second)
}
}
// step5: delete topic in source cluster
status = 5
deleteSourceClusterTopicCommand := buildDeleteSourceClusterTopicCommand(topic, sourceCluster, nameServer)
reqLogger.Info("deleteSourceClusterTopicCommand: " + deleteSourceClusterTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteSourceClusterTopicCommand)
output, err = cmd.Output()
if err != nil || !isDeleteTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete Topic "+topic+" in SourceCluster "+sourceCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully delete Topic " + topic + " in SourceCluster " + sourceCluster + " with output: " + string(output))
// step6: delete consumer group in source cluster
status = 6
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
deleteConsumerGroupCommand := buildDeleteConsumeGroupCommand(consumerGroup, sourceCluster, nameServer)
reqLogger.Info("deleteConsumerGroupCommand: " + deleteConsumerGroupCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, deleteConsumerGroupCommand)
output, err = cmd.Output()
if err != nil || !isDeleteConsumerGroupSuccess(string(output)) {
reqLogger.Error(err, "Failed to delete consumer group "+consumerGroup+" in SourceCluster "+sourceCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully delete consumer group " + consumerGroup + " in SourceCluster " + sourceCluster + " with output: " + string(output))
}
// step7: create retry topic
status = 7
for i, consumerGroup := range consumerGroups {
log.Info("Processing consumer group" + consumerGroup + " " + strconv.Itoa(i+1) + "/" + strconv.Itoa(len(consumerGroups)))
createRetryTopicCommand := buildAddRetryTopicToClusterCommand(consumerGroup, targetCluster, nameServer)
reqLogger.Info("createRetryTopicCommand: " + createRetryTopicCommand)
cmd = exec.Command(cons.BasicCommand, cons.AdminToolDir, createRetryTopicCommand)
output, err = cmd.Output()
if err != nil || !isUpdateTopicCommandSuccess(string(output)) {
reqLogger.Error(err, "Failed to create retry topic of consumer group "+consumerGroup+" in TargetCluster "+targetCluster+" with output: "+string(output))
// terminate the transfer process
undo = true
return reconcile.Result{Requeue: true}, err
}
reqLogger.Info("Successfully create retry topic of consumer group " + consumerGroup + " in TargetCluster " + targetCluster + " with output: " + string(output))
}
reqLogger.Info("Topic " + topic + " has been successfully transferred from " + sourceCluster + " to " + targetCluster)
}
return reconcile.Result{}, nil
}