in solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/graph/GatherNodesStream.java [129:326]
public GatherNodesStream(StreamExpression expression, StreamFactory factory) throws IOException {
String collectionName = factory.getValueOperand(expression, 0);
List<StreamExpressionNamedParameter> namedParams = factory.getNamedOperands(expression);
StreamExpressionNamedParameter zkHostExpression = factory.getNamedOperand(expression, "zkHost");
List<StreamExpression> streamExpressions =
factory.getExpressionOperandsRepresentingTypes(
expression, Expressible.class, TupleStream.class);
// Collection Name
if (null == collectionName) {
throw new IOException(
String.format(
Locale.ROOT,
"invalid expression %s - collectionName expected as first operand",
expression));
}
Set<Traversal.Scatter> scatter = new HashSet<>();
StreamExpressionNamedParameter scatterExpression =
factory.getNamedOperand(expression, "scatter");
if (scatterExpression == null) {
scatter.add(Traversal.Scatter.LEAVES);
} else {
String s = ((StreamExpressionValue) scatterExpression.getParameter()).getValue();
String[] sArray = s.split(",");
for (String sv : sArray) {
sv = sv.trim();
if (Traversal.Scatter.BRANCHES.toString().equalsIgnoreCase(sv)) {
scatter.add(Traversal.Scatter.BRANCHES);
} else if (Traversal.Scatter.LEAVES.toString().equalsIgnoreCase(sv)) {
scatter.add(Traversal.Scatter.LEAVES);
}
}
}
String gather = null;
StreamExpressionNamedParameter gatherExpression = factory.getNamedOperand(expression, "gather");
if (gatherExpression == null) {
throw new IOException(
String.format(Locale.ROOT, "invalid expression %s - from param is required", expression));
} else {
gather = ((StreamExpressionValue) gatherExpression.getParameter()).getValue();
}
String traverseFrom = null;
String traverseTo = null;
StreamExpressionNamedParameter edgeExpression = factory.getNamedOperand(expression, "walk");
TupleStream stream = null;
if (edgeExpression == null) {
throw new IOException(
String.format(Locale.ROOT, "invalid expression %s - walk param is required", expression));
} else {
if (streamExpressions.size() > 0) {
stream = factory.constructStream(streamExpressions.get(0));
String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
String[] fields = edge.split("->");
if (fields.length != 2) {
throw new IOException(
String.format(
Locale.ROOT,
"invalid expression %s - walk param separated by an -> and must contain two fields",
expression));
}
traverseFrom = fields[0].trim();
traverseTo = fields[1].trim();
} else {
String edge = ((StreamExpressionValue) edgeExpression.getParameter()).getValue();
String[] fields = edge.split("->");
if (fields.length != 2) {
throw new IOException(
String.format(
Locale.ROOT,
"invalid expression %s - walk param separated by an -> and must contain two fields",
expression));
}
String[] rootNodes = fields[0].split(",");
List<String> l = new ArrayList<>();
for (String n : rootNodes) {
l.add(n.trim());
}
stream = new NodeStream(l);
traverseFrom = "node";
traverseTo = fields[1].trim();
}
}
List<StreamExpression> metricExpressions =
factory.getExpressionOperandsRepresentingTypes(expression, Expressible.class, Metric.class);
List<Metric> metrics = new ArrayList<>();
for (int idx = 0; idx < metricExpressions.size(); ++idx) {
metrics.add(factory.constructMetric(metricExpressions.get(idx)));
}
boolean trackTraversal = false;
StreamExpressionNamedParameter trackExpression =
factory.getNamedOperand(expression, "trackTraversal");
if (trackExpression != null) {
trackTraversal =
Boolean.parseBoolean(((StreamExpressionValue) trackExpression.getParameter()).getValue());
} else {
useDefaultTraversal = true;
}
StreamExpressionNamedParameter windowExpression = factory.getNamedOperand(expression, "window");
int timeWindow = Integer.MIN_VALUE;
int intervalParam = -1;
if (windowExpression != null) {
String windowValue = ((StreamExpressionValue) windowExpression.getParameter()).getValue();
if (windowValue.contains("WEEKDAY")) {
intervalParam = WEEK_DAY_INTERVAL;
timeWindow = Integer.parseInt(windowValue.split("WEEKDAY")[0]);
} else if (windowValue.contains("DAY")) {
intervalParam = DAY_INTERVAL;
timeWindow = Integer.parseInt(windowValue.split("DAY")[0]);
} else {
intervalParam = TEN_SECOND_INTERVAL;
timeWindow = Integer.parseInt(windowValue);
}
}
StreamExpressionNamedParameter lagExpression = factory.getNamedOperand(expression, "lag");
int timeLag = 0;
if (lagExpression != null) {
timeLag = Integer.parseInt(((StreamExpressionValue) lagExpression.getParameter()).getValue());
}
StreamExpressionNamedParameter docFreqExpression =
factory.getNamedOperand(expression, "maxDocFreq");
int docFreq = -1;
if (docFreqExpression != null) {
docFreq =
Integer.parseInt(((StreamExpressionValue) docFreqExpression.getParameter()).getValue());
}
Map<String, String> params = new HashMap<String, String>();
for (StreamExpressionNamedParameter namedParam : namedParams) {
if (!namedParam.getName().equals("zkHost")
&& !namedParam.getName().equals("gather")
&& !namedParam.getName().equals("walk")
&& !namedParam.getName().equals("scatter")
&& !namedParam.getName().equals("maxDocFreq")
&& !namedParam.getName().equals("trackTraversal")
&& !namedParam.getName().equals("window")
&& !namedParam.getName().equals("lag")) {
params.put(namedParam.getName(), namedParam.getParameter().toString().trim());
}
}
// zkHost, optional - if not provided then will look into factory list to get
String zkHost = null;
if (null == zkHostExpression) {
zkHost = factory.getCollectionZkHost(collectionName);
if (zkHost == null) {
zkHost = factory.getDefaultZkHost();
}
} else if (zkHostExpression.getParameter() instanceof StreamExpressionValue) {
zkHost = ((StreamExpressionValue) zkHostExpression.getParameter()).getValue();
}
if (null == zkHost) {
throw new IOException(
String.format(
Locale.ROOT,
"invalid expression %s - zkHost not found for collection '%s'",
expression,
collectionName));
}
// We've got all the required items
init(
zkHost,
collectionName,
stream,
traverseFrom,
traverseTo,
gather,
params,
metrics,
trackTraversal,
scatter,
docFreq,
timeWindow,
timeLag,
intervalParam);
}