in stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java [1186:1316]
private void process(final Subscriber<? super WriteEvent> subscriber) {
try {
// we ignore imported entity type information, entities get the type of the collection
Stack<JsonToken> objectStartStack = new Stack();
Stack<String> objectNameStack = new Stack();
EntityRef lastEntity = null;
String entityType = null;
while (true) {
JsonToken token = jp.nextToken();
// nothing left to do.
if (token == null) {
break;
}
String name = jp.getCurrentName();
// start of an object with a field name
if (token.equals(JsonToken.START_OBJECT)) {
objectStartStack.push(token);
// nothing to do
if (name == null) {
continue;
}
if ("Metadata".equals(name)) {
Map<String, Object> entityMap = jp.readValueAs(HashMap.class);
UUID uuid = null;
if (entityMap.get("uuid") != null) {
uuid = UUID.fromString((String) entityMap.get("uuid"));
lastEntity = new SimpleEntityRef(entityType, uuid);
}
if (entitiesOnly) {
//logger.debug("{}Got entity with uuid {}", indent, lastEntity);
WriteEvent event = new EntityEvent(uuid, entityType, entityMap);
processWriteEvent(subscriber, event);
}
objectStartStack.pop();
} else if ("connections".equals(name)) {
Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);
for (String type : connectionMap.keySet()) {
List targets = (List) connectionMap.get(type);
for (Object targetObject : targets) {
UUID target = UUID.fromString((String) targetObject);
if (!entitiesOnly) {
//logger.debug("{}Got connection {} to {}",
//new Object[]{indent, type, target.toString()});
EntityRef entryRef = new SimpleEntityRef(target);
WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
processWriteEvent(subscriber, event);
}
}
}
objectStartStack.pop();
} else if ("dictionaries".equals(name)) {
Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
for (String dname : dictionariesMap.keySet()) {
Map dmap = (Map) dictionariesMap.get(dname);
if (!entitiesOnly) {
//logger.debug("{}Got dictionary {} size {}",
//new Object[] {indent, dname, dmap.size() });
WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
processWriteEvent(subscriber, event);
}
}
objectStartStack.pop();
} else {
// push onto object names we don't immediately understand. Used for parent detection
objectNameStack.push(name);
}
} else if (token.equals(JsonToken.START_ARRAY)) {
if (objectNameStack.size() == 1
&& COLLECTION_OBJECT_NAME.equals(objectNameStack.peek())) {
entityType = InflectionUtils.singularize(name);
}
} else if (token.equals(JsonToken.END_OBJECT)) {
objectStartStack.pop();
}
}
if (subscriber != null) {
subscriber.onCompleted();
}
if (logger.isTraceEnabled()) {
logger.trace("process(): done parsing JSON");
}
} catch (Exception e) {
tracker.fatal(e.getMessage());
if (subscriber != null) {
// don't need to blow up here, we handled the problem
// if we blow up we may prevent in-flight entities from being written
// subscriber.onError(e);
// but we are done reading entities
subscriber.onCompleted();
}
}
}