public void doImport()

in stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java [523:667]


    public void doImport(JobExecution jobExecution) throws Exception {

        if (logger.isTraceEnabled()) {
            logger.trace("doImport()");
        }

        @SuppressWarnings("unchecked")
        Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
        if (config == null) {
            logger.error("doImport(): Import Information passed through is null");
            return;
        }

        @SuppressWarnings("unchecked")
        Map<String, Object> properties = (Map<String, Object>) config.get("properties");
        @SuppressWarnings("unchecked")
        Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info");

        String bucketName = (String) storage_info.get("bucket_location");
        String accessId = (String) storage_info.get("s3_access_id");
        String secretKey = (String) storage_info.get("s3_key");

        // get Import Entity from the management app, update it to show that job has started

        final EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId());
        UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
        Import importEntity = rootEM.get(importId, Import.class);

        importEntity.setState(Import.State.STARTED);
        importEntity.setStarted(System.currentTimeMillis());
        importEntity.setErrorMessage(" ");
        rootEM.update(importEntity);
        if (logger.isTraceEnabled()) {
            logger.trace("doImport(): updated state");
        }

        // if no S3 importer was passed in then create one

        S3Import s3Import;
        Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
        try {
            if (s3PlaceHolder != null) {
                s3Import = (S3Import) s3PlaceHolder;
            } else {
                s3Import = new S3ImportImpl();
            }
        } catch (Exception e) {
            logger.error("doImport(): Error creating S3Import", e);
            importEntity.setErrorMessage(e.getMessage());
            importEntity.setState(Import.State.FAILED);
            rootEM.update(importEntity);
            return;
        }

        // get list of all JSON files in S3 bucket

        final List<String> bucketFiles;
        try {

            if (config.get("organizationId") == null) {
                logger.error("doImport(): No organization could be found");
                importEntity.setErrorMessage("No organization could be found");
                importEntity.setState(Import.State.FAILED);
                rootEM.update(importEntity);
                return;

            } else {

                if (config.get("applicationId") == null) {
                    throw new UnsupportedOperationException("Import applications not supported");

                } else {
                    bucketFiles = s3Import.getBucketFileNames(bucketName, ".json", accessId, secretKey);
                }
            }

        } catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
            importEntity.setErrorMessage(e.getMessage());
            importEntity.setState(Import.State.FAILED);
            rootEM.update(importEntity);
            return;
        }


        // schedule a FileImport job for each file found in the bucket

        if (bucketFiles.isEmpty()) {
            importEntity.setState(Import.State.FINISHED);
            importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
            rootEM.update(importEntity);

        } else {

            Map<String, Object> fileMetadata = new HashMap<>();
            ArrayList<Map<String, Object>> value = new ArrayList<>();
            final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size());

            // create the Entity Connection and set up metadata for each job

            for (String bucketFile : bucketFiles) {
                final JobData jobData = createFileTask(config, bucketFile, importEntity);
                fileJobs.add(jobData);
            }

            int retries = 0;
            int maxRetries = 60;
            boolean done = false;
            while (!done && retries++ < maxRetries) {

                final int count = getConnectionCount(importEntity);

                if (count == fileJobs.size()) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Got ALL {} of {} expected connections", count, fileJobs.size());
                    }
                    done = true;
                } else {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Got {} of {} expected connections. Waiting...", count, fileJobs.size());
                    }
                    Thread.sleep(1000);
                }
            }
            if (retries >= maxRetries) {
                throw new RuntimeException("Max retries was reached");
            }

            // schedule each job

            for (JobData jobData : fileJobs) {

                final JobData scheduled = scheduleFileTasks(jobData);

                Map<String, Object> fileJobID = new HashMap<>();
                fileJobID.put("FileName", scheduled.getProperty("File"));
                fileJobID.put("JobID", scheduled.getUuid());
                value.add(fileJobID);
            }

            fileMetadata.put("files", value);
            importEntity.addProperties(fileMetadata);
            importEntity.setFileCount(fileJobs.size());
            rootEM.update(importEntity);
        }
    }