public GatherNodesStream()

in solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java [129:326]


  public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {

    String collectionName = factory.getValueOperand(expression, 0);
    List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
    StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");

    List<StreamExpression> streamExpressions =
        factory.getExpressionOperandsRepresentingTypes(
            expression, Expressible.class, TupleStream.class);
    // Collection Name
    if (null == collectionName) {
      throw new IOException(
          String.format(
              Locale.ROOT,
              "invalid expression %s - collectionName expected as first operand",
              expression));
    }

    Set<Traversal.Scatter> scatter = new HashSet<>();

    StreamExpressionNamedParameter scatterExpression =
        factory.getNamedOperand(expression, "scatter");

    if (scatterExpression == null) {
      scatter.add(Traversal.Scatter.LEAVES);
    } else {
      String s = ((StreamExpressionValue) scatterExpression.getParameter()).getValue();
      String[] sArray = s.split(",");
      for (String sv : sArray) {
        sv = sv.trim();
        if (Traversal.Scatter.BRANCHES.toString().equalsIgnoreCase(sv)) {
          scatter.add(Traversal.Scatter.BRANCHES);
        } else if (Traversal.Scatter.LEAVES.toString().equalsIgnoreCase(sv)) {
          scatter.add(Traversal.Scatter.LEAVES);
        }
      }
    }

    String gather = null;
    StreamExpressionNamedParameter gatherExpression = factory.getNamedOperand(expression, "gather");

    if (gatherExpression == null) {
      throw new IOException(
          String.format(Locale.ROOT, "invalid expression %s - from param is required", expression));
    } else {
      gather = ((StreamExpressionValue) gatherExpression.getParameter()).getValue();
    }

    String traverseFrom = null;
    String traverseTo = null;
    StreamExpressionNamedParameter edgeExpression = factory.getNamedOperand(expression, "walk");

    TupleStream stream = null;

    if (edgeExpression == null) {
      throw new IOException(
          String.format(Locale.ROOT, "invalid expression %s - walk param is required", expression));
    } else {
      if (streamExpressions.size() > 0) {
        stream = factory.constructStream(streamExpressions.get(0));
        String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
        String[] fields = edge.split("->");
        if (fields.length != 2) {
          throw new IOException(
              String.format(
                  Locale.ROOT,
                  "invalid expression %s - walk param separated by an -> and must contain two fields",
                  expression));
        }
        traverseFrom = fields[0].trim();
        traverseTo = fields[1].trim();
      } else {
        String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
        String[] fields = edge.split("->");
        if (fields.length != 2) {
          throw new IOException(
              String.format(
                  Locale.ROOT,
                  "invalid expression %s - walk param separated by an -> and must contain two fields",
                  expression));
        }

        String[] rootNodes = fields[0].split(",");
        List<String> l = new ArrayList<>();
        for (String n : rootNodes) {
          l.add(n.trim());
        }

        stream = new NodeStream(l);
        traverseFrom = "node";
        traverseTo = fields[1].trim();
      }
    }

    List<StreamExpression> metricExpressions =
        factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
    List<Metric> metrics = new ArrayList<>();
    for (int idx = 0; idx < metricExpressions.size(); ++idx) {
      metrics.add(factory.constructMetric(metricExpressions.get(idx)));
    }

    boolean trackTraversal = false;

    StreamExpressionNamedParameter trackExpression =
        factory.getNamedOperand(expression, "trackTraversal");

    if (trackExpression != null) {
      trackTraversal =
          Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue());
    } else {
      useDefaultTraversal = true;
    }

    StreamExpressionNamedParameter windowExpression = factory.getNamedOperand(expression, "window");
    int timeWindow = Integer.MIN_VALUE;
    int intervalParam = -1;

    if (windowExpression != null) {
      String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
      if (windowValue.contains("WEEKDAY")) {
        intervalParam = WEEK_DAY_INTERVAL;
        timeWindow = Integer.parseInt(windowValue.split("WEEKDAY")[0]);
      } else if (windowValue.contains("DAY")) {
        intervalParam = DAY_INTERVAL;
        timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
      } else {
        intervalParam = TEN_SECOND_INTERVAL;
        timeWindow = Integer.parseInt(windowValue);
      }
    }

    StreamExpressionNamedParameter lagExpression = factory.getNamedOperand(expression, "lag");
    int timeLag = 0;

    if (lagExpression != null) {
      timeLag = Integer.parseInt(((StreamExpressionValue) lagExpression.getParameter()).getValue());
    }

    StreamExpressionNamedParameter docFreqExpression =
        factory.getNamedOperand(expression, "maxDocFreq");
    int docFreq = -1;

    if (docFreqExpression != null) {
      docFreq =
          Integer.parseInt(((StreamExpressionValue) docFreqExpression.getParameter()).getValue());
    }

    Map<String, String> params = new HashMap<String, String>();
    for (StreamExpressionNamedParameter namedParam : namedParams) {
      if (!namedParam.getName().equals("zkHost")
          && !namedParam.getName().equals("gather")
          && !namedParam.getName().equals("walk")
          && !namedParam.getName().equals("scatter")
          && !namedParam.getName().equals("maxDocFreq")
          && !namedParam.getName().equals("trackTraversal")
          && !namedParam.getName().equals("window")
          && !namedParam.getName().equals("lag")) {
        params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
      }
    }

    // zkHost, optional - if not provided then will look into factory list to get
    String zkHost = null;
    if (null == zkHostExpression) {
      zkHost = factory.getCollectionZkHost(collectionName);
      if (zkHost == null) {
        zkHost = factory.getDefaultZkHost();
      }
    } else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
      zkHost = ((StreamExpressionValue) zkHostExpression.getParameter()).getValue();
    }

    if (null == zkHost) {
      throw new IOException(
          String.format(
              Locale.ROOT,
              "invalid expression %s - zkHost not found for collection '%s'",
              expression,
              collectionName));
    }

    // We've got all the required items
    init(
        zkHost,
        collectionName,
        stream,
        traverseFrom,
        traverseTo,
        gather,
        params,
        metrics,
        trackTraversal,
        scatter,
        docFreq,
        timeWindow,
        timeLag,
        intervalParam);
  }