in twill-zookeeper/src/main/java/org/apache/twill/internal/zookeeper/DefaultZKClientService.java [140:242]
private OperationFuture<String> doCreate(final String path,
@Nullable final byte[] data,
final CreateMode createMode,
final boolean createParent,
final List<ACL> acl,
final boolean ignoreNodeExists) {
final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
if (!createParent) {
return createFuture;
}
// If create parent is request, return a different future
final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
// Watch for changes in the original future
Futures.addCallback(createFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String path) {
// Propagate if creation was successful
result.set(path);
}
@Override
public void onFailure(Throwable t) {
// See if the failure can be handled
if (updateFailureResult(t, result, path, ignoreNodeExists)) {
return;
}
// Create the parent node
String parentPath = getParent(path);
if (parentPath.isEmpty()) {
result.setException(t);
return;
}
// Watch for parent creation complete. Parent is created with the unsafe ACL.
Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
@Override
public void onSuccess(String parentPath) {
// Create the requested path again
Futures.addCallback(
doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
@Override
public void onSuccess(String pathResult) {
result.set(pathResult);
}
@Override
public void onFailure(Throwable t) {
// handle the failure
updateFailureResult(t, result, path, ignoreNodeExists);
}
});
}
@Override
public void onFailure(Throwable t) {
result.setException(t);
}
});
}
/**
* Updates the result future based on the given {@link Throwable}.
* @param t Cause of the failure
* @param result Future to be updated
* @param path Request path for the operation
* @return {@code true} if it is a failure, {@code false} otherwise.
*/
private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
String path, boolean ignoreNodeExists) {
// Propagate if there is error
if (!(t instanceof KeeperException)) {
result.setException(t);
return true;
}
KeeperException.Code code = ((KeeperException) t).code();
// Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
// The requested path could be used because it only applies to non-sequential node
result.set(path);
return false;
}
if (code != KeeperException.Code.NONODE) {
result.setException(t);
return true;
}
return false;
}
/**
* Gets the parent of the given path.
* @param path Path for computing its parent
* @return Parent of the given path, or empty string if the given path is the root path already.
*/
private String getParent(String path) {
String parentPath = path.substring(0, path.lastIndexOf('/'));
return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
}
});
return result;
}