in stack/core/src/main/java/org/apache/usergrid/corepersistence/export/ExportServiceImpl.java [89:271]
public void export(final ExportRequestBuilder exportRequestBuilder, OutputStream stream) throws RuntimeException {
final ZipOutputStream zipOutputStream = new ZipOutputStream(stream);
zipOutputStream.setLevel(9); // best compression to reduce the amount of data to stream over the wire
final ApplicationScope appScope = exportRequestBuilder.getApplicationScope().get();
final Observable<ApplicationScope> applicationScopes = Observable.just(appScope);
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(appScope);
GraphManager gm = managerCache.getGraphManager( appScope );
final AtomicInteger entityFileCount = new AtomicInteger(); // entities are batched into files
final AtomicInteger connectionCount = new AtomicInteger();
final Map<String, AtomicInteger> collectionStats = new HashMap<>();
collectionStats.put(keyTotalEntityCount, new AtomicInteger());
final Map<String, Object> infoMap = new HashMap<>();
infoMap.put("application", appScope.getApplication().getUuid().toString());
infoMap.put("exportVersion", exportVersion);
infoMap.put("exportStarted", System.currentTimeMillis());
logger.info("Starting export of application: {}", appScope.getApplication().getUuid().toString());
allEntityIdsObservable.getEdgesToEntities( applicationScopes, Optional.absent(), Optional.absent() )
.buffer(500)
.map( edgeScopes -> {
List<Id> entityIds = new ArrayList<>();
edgeScopes.forEach( edgeScope -> {
if (edgeScope.getEdge().getTargetNode() != null) {
logger.debug("adding entity to list: {}", edgeScope.getEdge().getTargetNode());
entityIds.add(edgeScope.getEdge().getTargetNode());
}
});
return entityIds;
})
.flatMap( entityIds -> {
logger.debug("entityIds: {}", entityIds);
// batch load the entities
EntitySet entitySet = ecm.load(entityIds).toBlocking().lastOrDefault(new EntitySetImpl(0));
final String filenameWithPath = "entities/entities." + entityFileCount.get() + ".json";
try {
logger.debug("adding zip entry: {}", filenameWithPath);
zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
entitySet.getEntities().forEach(mvccEntity -> {
if (mvccEntity.getEntity().isPresent()) {
Map entityMap = CpEntityMapUtils.toMap(mvccEntity.getEntity().get());
try {
collectionStats.putIfAbsent(mvccEntity.getId().getType(), new AtomicInteger());
collectionStats.get(mvccEntity.getId().getType()).incrementAndGet();
collectionStats.get(keyTotalEntityCount).incrementAndGet();
logger.debug("writing and flushing entity {} to zip stream for file: {}", mvccEntity.getId().getUuid().toString(), filenameWithPath);
zipOutputStream.write(jsonSerializer.toString(entityMap).getBytes());
zipOutputStream.write("\n".getBytes());
zipOutputStream.flush(); // entities can be large, flush after each
} catch (IOException e) {
logger.warn("unable to write entry in zip stream for entityId: {}", mvccEntity.getId());
throw new RuntimeException("Unable to export data. Error writing to stream.");
}
} else {
logger.warn("entityId {} did not have corresponding entity, not writing", mvccEntity.getId());
}
});
zipOutputStream.closeEntry();
entityFileCount.incrementAndGet();
}catch (IOException e){
throw new RuntimeException("Unable to export data. Error writing to stream.");
}
return Observable.from(entitySet.getEntities());
})
.doOnNext( mvccEntity -> {
gm.getEdgeTypesFromSource(CpNamingUtils.createConnectionTypeSearch(mvccEntity.getId()))
.flatMap(emittedEdgeType -> {
logger.debug("loading edges of type {} from node {}", emittedEdgeType, mvccEntity.getId());
return gm.loadEdgesFromSource(new SimpleSearchByEdgeType(mvccEntity.getId(),
emittedEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent()));
})
.doOnNext(markedEdge -> {
if (!markedEdge.isDeleted() && !markedEdge.isTargetNodeDeleted() && markedEdge.getTargetNode() != null ) {
// doing the load to just again make sure bad connections are not exported
Entity entity = ecm.load(markedEdge.getTargetNode()).toBlocking().lastOrDefault(null);
if (entity != null) {
try {
// since a single stream is being written, and connecitons are loaded per entity,
// it cannot easily be batched eventlyinto files so write them separately
final String filenameWithPath = "connections/" +
markedEdge.getSourceNode().getUuid().toString()+"_" +
CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) + "_" +
markedEdge.getTargetNode().getUuid().toString() + ".json";
logger.debug("adding zip entry: {}", filenameWithPath);
zipOutputStream.putNextEntry(new ZipEntry(filenameWithPath));
final Map<String,String> connectionMap = new HashMap<String,String>(1){{
put("sourceNodeUUID", markedEdge.getSourceNode().getUuid().toString() );
put("relationship", CpNamingUtils.getConnectionNameFromEdgeName(markedEdge.getType()) );
put("targetNodeUUID", markedEdge.getTargetNode().getUuid().toString());
}};
logger.debug("writing and flushing connection to zip stream: {}", jsonSerializer.toString(connectionMap).getBytes());
zipOutputStream.write(jsonSerializer.toString(connectionMap).getBytes());
zipOutputStream.closeEntry();
zipOutputStream.flush();
connectionCount.incrementAndGet();
} catch (IOException e) {
logger.warn("Unable to create entry in zip export for edge {}", markedEdge.toString());
throw new RuntimeException("Unable to export data. Error writing to stream.");
}
} else {
logger.warn("Exported connection has a missing target node, not creating connection in export. Edge: {}", markedEdge);
}
}
}).toBlocking().lastOrDefault(null);
})
.doOnCompleted(() -> {
infoMap.put("exportFinished", System.currentTimeMillis());
try {
zipOutputStream.putNextEntry(new ZipEntry("metadata.json"));
zipOutputStream.write(jsonSerializer.toString(infoMap).getBytes());
zipOutputStream.closeEntry();
zipOutputStream.putNextEntry(new ZipEntry("stats.json"));
Map<String, Object> stats = new HashMap<>();
stats.put("totalEntities", collectionStats.get(keyTotalEntityCount).get());
stats.put("totalConnections", connectionCount.get());
collectionStats.remove(keyTotalEntityCount);
stats.put("collectionCounts", new HashMap<String, Integer>(collectionStats.size()){{
collectionStats.forEach( (collection,count) -> {
put(InflectionUtils.pluralize(collection),count.get());
});
}});
zipOutputStream.write(jsonSerializer.toString(stats).getBytes());
zipOutputStream.closeEntry();
logger.debug("closing zip stream");
zipOutputStream.close();
logger.info("Finished export of application: {}", appScope.getApplication().getUuid().toString());
} catch (IOException e) {
throw new RuntimeException("Unable to export data due to inability to close zip stream.");
}
})
.subscribeOn( Schedulers.io() ).toBlocking().lastOrDefault(null);
}