public Representation put()

in uReplicator-Manager/src/main/java/com/uber/stream/kafka/mirrormaker/manager/rest/resources/TopicManagementRestletResource.java [223:261]


  public Representation put(Representation entity) {
    final String topicName = (String) getRequest().getAttributes().get("topicName");
    Form queryParams = getRequest().getResourceRef().getQueryAsForm();
    String srcCluster = queryParams.getFirstValue("src");
    String dstCluster = queryParams.getFirstValue("dst");
    String newNumPartitions = queryParams.getFirstValue("partitions");

    LOGGER.info("Received request to expand topic {} from {} to {} to {} partitions on uReplicator",
        topicName, srcCluster, dstCluster, newNumPartitions);

    _helixMirrorMakerManager.updateCurrentStatus();
    String pipeline = ControllerUtils.getPipelineName(srcCluster, dstCluster);
    if (!_helixMirrorMakerManager.isTopicPipelineExisted(topicName, pipeline)) {
      LOGGER.info("Topic {} doesn't exist in pipeline {}, abandon expanding topic", topicName,
          pipeline);
      return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
          String
              .format("Topic %s doesn't exist in pipeline %s, abandon expanding topic!", topicName,
                  pipeline));
    } else {
      try {
        _helixMirrorMakerManager.expandTopicInMirrorMaker(topicName, srcCluster, pipeline,
            Integer.valueOf(newNumPartitions));
        LOGGER.info("Successfully expand the topic {} in pipeline {} to {} partitions",
            topicName, pipeline, newNumPartitions);
        return getResponseJsonStringRepresentation(Status.SUCCESS_OK,
            String.format("Successfully expand the topic %s in pipeline %s to %s partitions",
                topicName, pipeline, newNumPartitions));
      } catch (Exception e) {
        LOGGER.error(String.format(
            "Failed to expand the topic %s in pipeline %s to %s partitions due to exception",
            topicName, pipeline, newNumPartitions), e);
        return getResponseJsonStringRepresentation(Status.CLIENT_ERROR_NOT_FOUND,
            String.format(
                "Failed to expand the topic %s in pipeline %s to %s partitions due to exception: %s",
                topicName, pipeline, newNumPartitions, e.toString()));
      }
    }
  }