public Representation post()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/rest/resources/TopicManagementRestletResource.java [115:178]


  public Representation post(Representation entity) {
    try {
      final String topicName = (String) getRequest().getAttributes().get("topicName");
      LOGGER.info("received request to whitelist topic {} on mm ", topicName);
      if (_managerControllerHelix != null) {
        // federated mode
        Form params = getRequest().getResourceRef().getQueryAsForm();
        String srcCluster = params.getFirstValue("src");
        String dstCluster = params.getFirstValue("dst");
        String routeId = params.getFirstValue("routeid");
        if (srcCluster == null || dstCluster == null || routeId == null) {
          getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
          return new StringRepresentation("Missing parameters for whitelisting topic " + topicName
              + " in federated uReplicator");
        }
        if (!_managerControllerHelix.handleTopicAssignmentEvent(topicName, srcCluster, dstCluster, routeId, "ONLINE")) {
          getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
          String resp = String.format("Failed to add new topic: %s, src=%s, dst=%s, routeid=%s",
              topicName, srcCluster, dstCluster, routeId);
          LOGGER.info(resp);
          return new StringRepresentation(resp);
        }
        String resp = String.format("Successfully add new topic: %s, src=%s, dst=%s, routeid=%s",
            topicName, srcCluster, dstCluster, routeId);
        LOGGER.info(resp);
        return new StringRepresentation(resp);
      }

      String jsonRequest = entity.getText();
      TopicPartition topicPartitionInfo = null;
      if ((jsonRequest == null || jsonRequest.isEmpty()) && topicName != null
          && _srcKafkaBrokerTopicObserver != null) {
        // Only triggered when srcKafkaObserver is there and curl call has no json blob.
        topicPartitionInfo = _srcKafkaBrokerTopicObserver.getTopicPartitionWithRefresh(topicName);
        if (topicPartitionInfo == null) {
          LOGGER.warn("failed to whitelist topic {} on mm because of not exists in src cluster", topicName);
          getResponse().setStatus(Status.CLIENT_ERROR_BAD_REQUEST);
          return new StringRepresentation(String.format(
              "Failed to add new topic: %s, it's not exsited in source kafka cluster!", topicName));
        }
      } else {
        topicPartitionInfo = TopicPartition.init(jsonRequest);
      }
      if (_autoTopicWhitelistingManager != null) {
        _autoTopicWhitelistingManager.removeFromBlacklist(topicPartitionInfo.getTopic());
      }
      if (_helixMirrorMakerManager.isTopicExisted(topicPartitionInfo.getTopic())) {
        LOGGER.info("topic {} already on mm", topicName);
        getResponse().setStatus(Status.CLIENT_ERROR_NOT_FOUND);
        return new StringRepresentation(String.format(
            "Failed to add new topic: %s, it is already existed!", topicPartitionInfo.getTopic()));
      } else {
        _helixMirrorMakerManager.addTopicToMirrorMaker(topicPartitionInfo);
        LOGGER.info("successuflly whitelist the topic {}", topicName);
        return new StringRepresentation(
            String.format("Successfully add new topic: %s", topicPartitionInfo));
      }
    } catch (IOException e) {
      LOGGER.error("Got error during processing Post request", e);
      getResponse().setStatus(Status.SERVER_ERROR_INTERNAL);
      return new StringRepresentation(
          String.format("Failed to add new topic, with exception: %s", e));
    }
  }