public Representation post()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [148:219]


  public Representation post(Representation entity) {
    final String topicName = (String) getRequest().getAttributes().get("topicName");
    Form queryParams = getRequest().getResourceRef().getQueryAsForm();
    String srcCluster = queryParams.getFirstValue("src");
    String dstCluster = queryParams.getFirstValue("dst");

    LOGGER.info("Received request to whitelist topic {} from {} to {} on uReplicator ",
        topicName, srcCluster, dstCluster);

    // TODO: validate src->dst combination
    if (!isValidPipeline(srcCluster, dstCluster)) {
      LOGGER.warn(
          "Failed to whitelist topic {} on uReplicator because of not valid pipeline from {} to {}",
          topicName, srcCluster, dstCluster);
      return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
          String.format(
              "Failed to whitelist topic %s on uReplicator because of not valid pipeline from %s to %s",
              topicName, srcCluster, dstCluster));
    }

    // Check if topic in source cluster
    TopicPartition srcTopicPartitionInfo = _clusterToObserverMap.get(srcCluster)
        .getTopicPartitionWithRefresh(topicName);
    LOGGER.info("Source topicPartitionInfo: {}", srcTopicPartitionInfo);
    if (srcTopicPartitionInfo == null) {
      LOGGER
          .warn("Failed to whitelist topic {} on uReplicator because of not exists in src cluster",
              topicName);
      return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
          String.format(
              "Failed to whitelist new topic: %s, it's not existed in source Kafka cluster!",
              topicName));
    }

    // Check if topic in destination cluster
    TopicPartition dstTopicPartitionInfo = _clusterToObserverMap.get(dstCluster)
        .getTopicPartitionWithRefresh(topicName);
    LOGGER.info("Destination topicPartitionInfo: {}", dstTopicPartitionInfo);
    if (dstTopicPartitionInfo == null) {
      LOGGER
          .warn("Failed to whitelist topic {} on uReplicator because of not exists in dst cluster",
              topicName);
      return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
          String.format(
              "Failed to whitelist new topic: %s, it's not existed in destination Kafka cluster!",
              topicName));
    }
    _helixMirrorMakerManager.updateCurrentStatus();
    String pipeline = ControllerUtils.getPipelineName(srcCluster, dstCluster);
    if (_helixMirrorMakerManager.isTopicPipelineExisted(topicName, pipeline)) {
      LOGGER.info("Topic {} already on uReplicator in pipeline {}", topicName, pipeline);
      return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
          String.format("Failed to add new topic: %s from: %s to: %s, it is already existed!",
              topicName, srcCluster, dstCluster));
    } else {
      try {
        _helixMirrorMakerManager.addTopicToMirrorMaker(srcTopicPartitionInfo.getTopic(),
            srcTopicPartitionInfo.getPartition(), srcCluster, dstCluster, pipeline);
        LOGGER.info("Successfully whitelist the topic {} from {} to {}", topicName, srcCluster,
            dstCluster);
        return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
            String.format("Successfully add new topic: %s from: %s to: %s",
                srcTopicPartitionInfo.getTopic(), srcCluster, dstCluster));
      } catch (Exception e) {
        LOGGER.info("Failed to whitelist the topic {} from {} to {} due to exception {}",
            topicName, srcCluster, dstCluster, e);
        return getResponseJsonStringRepresentation(Status.SERVER_ERROR_INTERNAL,
            String.format("Failed add new topic: %s from: %s to: %s due to exception: %s",
                srcTopicPartitionInfo.getTopic(), srcCluster, dstCluster, e.toString()));
      }
    }
  }