private void process()

in stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java [1186:1316]


        private void process(final Subscriber<? super WriteEvent> subscriber) {

            try {

                // we ignore imported entity type information, entities get the type of the collection
                Stack<JsonToken> objectStartStack = new Stack();
                Stack<String> objectNameStack = new Stack();
                EntityRef lastEntity = null;

                String entityType = null;

                while (true) {

                    JsonToken token = jp.nextToken();

                    // nothing left to do.
                    if (token == null) {
                        break;
                    }

                    String name = jp.getCurrentName();


                    // start of an object with a field name

                    if (token.equals(JsonToken.START_OBJECT)) {

                        objectStartStack.push(token);

                        // nothing to do
                        if (name == null) {
                            continue;
                        }


                        if ("Metadata".equals(name)) {

                            Map<String, Object> entityMap = jp.readValueAs(HashMap.class);

                            UUID uuid = null;
                            if (entityMap.get("uuid") != null) {
                                uuid = UUID.fromString((String) entityMap.get("uuid"));
                                lastEntity = new SimpleEntityRef(entityType, uuid);
                            }

                            if (entitiesOnly) {
                                //logger.debug("{}Got entity with uuid {}", indent, lastEntity);

                                WriteEvent event = new EntityEvent(uuid, entityType, entityMap);
                                processWriteEvent(subscriber, event);
                            }
                            objectStartStack.pop();
                        } else if ("connections".equals(name)) {

                            Map<String, Object> connectionMap = jp.readValueAs(HashMap.class);

                            for (String type : connectionMap.keySet()) {
                                List targets = (List) connectionMap.get(type);

                                for (Object targetObject : targets) {
                                    UUID target = UUID.fromString((String) targetObject);

                                    if (!entitiesOnly) {
                                        //logger.debug("{}Got connection {} to {}",
                                        //new Object[]{indent, type, target.toString()});

                                        EntityRef entryRef = new SimpleEntityRef(target);
                                        WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef);
                                        processWriteEvent(subscriber, event);
                                    }
                                }
                            }

                            objectStartStack.pop();

                        } else if ("dictionaries".equals(name)) {

                            Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class);
                            for (String dname : dictionariesMap.keySet()) {
                                Map dmap = (Map) dictionariesMap.get(dname);

                                if (!entitiesOnly) {
                                    //logger.debug("{}Got dictionary {} size {}",
                                    //new Object[] {indent, dname, dmap.size() });

                                    WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap);
                                    processWriteEvent(subscriber, event);
                                }
                            }

                            objectStartStack.pop();

                        } else {
                            // push onto object names we don't immediately understand.  Used for parent detection
                            objectNameStack.push(name);
                        }

                    } else if (token.equals(JsonToken.START_ARRAY)) {
                        if (objectNameStack.size() == 1
                            && COLLECTION_OBJECT_NAME.equals(objectNameStack.peek())) {
                            entityType = InflectionUtils.singularize(name);
                        }

                    } else if (token.equals(JsonToken.END_OBJECT)) {
                        objectStartStack.pop();
                    }
                }

                if (subscriber != null) {
                    subscriber.onCompleted();
                }

                if (logger.isTraceEnabled()) {
                    logger.trace("process(): done parsing JSON");
                }

            } catch (Exception e) {

                tracker.fatal(e.getMessage());

                if (subscriber != null) {

                    // don't need to blow up here, we handled the problem
                    // if we blow up we may prevent in-flight entities from being written
                    // subscriber.onError(e);

                    // but we are done reading entities
                    subscriber.onCompleted();
                }
            }
        }