in stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java [523:667]
public void doImport(JobExecution jobExecution) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("doImport()");
}
@SuppressWarnings("unchecked")
Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo");
if (config == null) {
logger.error("doImport(): Import Information passed through is null");
return;
}
@SuppressWarnings("unchecked")
Map<String, Object> properties = (Map<String, Object>) config.get("properties");
@SuppressWarnings("unchecked")
Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info");
String bucketName = (String) storage_info.get("bucket_location");
String accessId = (String) storage_info.get("s3_access_id");
String secretKey = (String) storage_info.get("s3_key");
// get Import Entity from the management app, update it to show that job has started
final EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId());
UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID);
Import importEntity = rootEM.get(importId, Import.class);
importEntity.setState(Import.State.STARTED);
importEntity.setStarted(System.currentTimeMillis());
importEntity.setErrorMessage(" ");
rootEM.update(importEntity);
if (logger.isTraceEnabled()) {
logger.trace("doImport(): updated state");
}
// if no S3 importer was passed in then create one
S3Import s3Import;
Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import");
try {
if (s3PlaceHolder != null) {
s3Import = (S3Import) s3PlaceHolder;
} else {
s3Import = new S3ImportImpl();
}
} catch (Exception e) {
logger.error("doImport(): Error creating S3Import", e);
importEntity.setErrorMessage(e.getMessage());
importEntity.setState(Import.State.FAILED);
rootEM.update(importEntity);
return;
}
// get list of all JSON files in S3 bucket
final List<String> bucketFiles;
try {
if (config.get("organizationId") == null) {
logger.error("doImport(): No organization could be found");
importEntity.setErrorMessage("No organization could be found");
importEntity.setState(Import.State.FAILED);
rootEM.update(importEntity);
return;
} else {
if (config.get("applicationId") == null) {
throw new UnsupportedOperationException("Import applications not supported");
} else {
bucketFiles = s3Import.getBucketFileNames(bucketName, ".json", accessId, secretKey);
}
}
} catch (OrganizationNotFoundException | ApplicationNotFoundException e) {
importEntity.setErrorMessage(e.getMessage());
importEntity.setState(Import.State.FAILED);
rootEM.update(importEntity);
return;
}
// schedule a FileImport job for each file found in the bucket
if (bucketFiles.isEmpty()) {
importEntity.setState(Import.State.FINISHED);
importEntity.setErrorMessage("No files found in the bucket: " + bucketName);
rootEM.update(importEntity);
} else {
Map<String, Object> fileMetadata = new HashMap<>();
ArrayList<Map<String, Object>> value = new ArrayList<>();
final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size());
// create the Entity Connection and set up metadata for each job
for (String bucketFile : bucketFiles) {
final JobData jobData = createFileTask(config, bucketFile, importEntity);
fileJobs.add(jobData);
}
int retries = 0;
int maxRetries = 60;
boolean done = false;
while (!done && retries++ < maxRetries) {
final int count = getConnectionCount(importEntity);
if (count == fileJobs.size()) {
if (logger.isTraceEnabled()) {
logger.trace("Got ALL {} of {} expected connections", count, fileJobs.size());
}
done = true;
} else {
if (logger.isTraceEnabled()) {
logger.trace("Got {} of {} expected connections. Waiting...", count, fileJobs.size());
}
Thread.sleep(1000);
}
}
if (retries >= maxRetries) {
throw new RuntimeException("Max retries was reached");
}
// schedule each job
for (JobData jobData : fileJobs) {
final JobData scheduled = scheduleFileTasks(jobData);
Map<String, Object> fileJobID = new HashMap<>();
fileJobID.put("FileName", scheduled.getProperty("File"));
fileJobID.put("JobID", scheduled.getUuid());
value.add(fileJobID);
}
fileMetadata.put("files", value);
importEntity.addProperties(fileMetadata);
importEntity.setFileCount(fileJobs.size());
rootEM.update(importEntity);
}
}