in modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java [126:219]
private void initialize(InitializationOptions opts, AccumuloClient client)
throws TableExistsException, AlreadyInitializedException {
boolean tableExists = client.tableOperations().exists(config.getAccumuloTable());
if (tableExists && !opts.getClearTable()) {
throw new TableExistsException("Accumulo table already exists " + config.getAccumuloTable());
}
// With preconditions met, it's now OK to delete table & zookeeper root (if they exist)
if (tableExists) {
logger.info("The Accumulo table '{}' will be dropped and created as requested by user",
config.getAccumuloTable());
try {
client.tableOperations().delete(config.getAccumuloTable());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
if (rootCurator.checkExists().forPath(appRootDir) != null) {
logger.info("Clearing Fluo '{}' application in Zookeeper at {}",
config.getApplicationName(), config.getAppZookeepers());
rootCurator.delete().deletingChildrenIfNeeded().forPath(appRootDir);
}
} catch (KeeperException.NoNodeException nne) {
// it's ok if node doesn't exist
} catch (Exception e) {
logger.error("An error occurred deleting Zookeeper root of [" + config.getAppZookeepers()
+ "], error=[" + e.getMessage() + "]");
throw new RuntimeException(e);
}
try {
initializeApplicationInZooKeeper(client);
String accumuloJars;
if (!config.getAccumuloJars().trim().isEmpty()) {
if (config.getDfsRoot().trim().isEmpty()) {
throw new IllegalStateException("The property " + FluoConfiguration.ACCUMULO_JARS_PROP
+ " is set and " + FluoConfiguration.DFS_ROOT_PROP
+ " is not set. So there is nowhere to copy the jars.");
}
accumuloJars = config.getAccumuloJars().trim();
} else if (!config.getDfsRoot().trim().isEmpty()) {
accumuloJars = getJarsFromClasspath();
} else {
accumuloJars = "";
}
String accumuloClasspath = "";
if (!accumuloJars.isEmpty()) {
accumuloClasspath = copyJarsToDfs(accumuloJars, "lib/accumulo");
}
Map<String, String> ntcProps = new HashMap<>();
if (!accumuloClasspath.isEmpty()) {
String contextName = "fluo-" + config.getApplicationName();
client.instanceOperations().setProperty(
AccumuloProps.VFS_CONTEXT_CLASSPATH_PROPERTY + contextName, accumuloClasspath);
ntcProps.put(AccumuloProps.TABLE_CLASSPATH, contextName);
}
if (config.getObserverJarsUrl().isEmpty() && !config.getObserverInitDir().trim().isEmpty()) {
String observerUrl = copyDirToDfs(config.getObserverInitDir().trim(), "lib/observers");
config.setObserverJarsUrl(observerUrl);
}
ntcProps.put(AccumuloProps.TABLE_BLOCKCACHE_ENABLED, "true");
ntcProps.put(AccumuloProps.TABLE_DELETE_BEHAVIOR, AccumuloProps.TABLE_DELETE_BEHAVIOR_VALUE);
NewTableConfiguration ntc = new NewTableConfiguration().withoutDefaultIterators();
ntc.setLocalityGroups(Collections.singletonMap(ColumnConstants.NOTIFY_LOCALITY_GROUP_NAME,
Collections.singleton(new Text(ColumnConstants.NOTIFY_CF.toArray()))));
ntc.enableSummarization(FluoSummarizer.CONFIG);
configureIterators(ntc);
ntc.setProperties(ntcProps);
client.tableOperations().create(config.getAccumuloTable(), ntc);
updateSharedConfig();
} catch (NodeExistsException nee) {
throw new AlreadyInitializedException();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}