in helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java [558:638]
private void subscribeForChanges(NotificationContext.Type callbackType, String path,
boolean watchChild) {
logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, "
+ "event types: {}, listener: {}, watchChild: {}",
_uid, path, callbackType, _eventTypes, _listener, watchChild);
long start = System.currentTimeMillis();
if (_eventTypes.contains(EventType.NodeDataChanged)
|| _eventTypes.contains(EventType.NodeCreated)
|| _eventTypes.contains(EventType.NodeDeleted)) {
logger.info("CallbackHandler {} subscribing data change listener to path: {}", _uid, path);
subscribeDataChange(path, callbackType);
}
if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
List<String> children = subscribeChildChange(path, callbackType);
if (watchChild) {
try {
switch (_changeType) {
case CURRENT_STATE:
case TASK_CURRENT_STATE:
case CUSTOMIZED_STATE:
case IDEAL_STATE:
case EXTERNAL_VIEW:
case CUSTOMIZED_VIEW:
case TARGET_EXTERNAL_VIEW: {
// check if bucketized
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
List<ZNRecord> records = baseAccessor.getChildren(path, null, 0, 0, 0);
for (ZNRecord record : records) {
HelixProperty property = new HelixProperty(record);
String childPath = path + "/" + record.getId();
int bucketSize = property.getBucketSize();
if (bucketSize > 0) {
// subscribe both data-change and child-change on bucketized parent node
// data-change gives a delete-callback which is used to remove watch
List<String> bucketizedChildNames = subscribeChildChange(childPath, callbackType);
subscribeDataChange(childPath, callbackType);
// subscribe data-change on bucketized child
if (bucketizedChildNames != null) {
for (String bucketizedChildName : bucketizedChildNames) {
String bucketizedChildPath = childPath + "/" + bucketizedChildName;
subscribeDataChange(bucketizedChildPath, callbackType);
}
}
} else {
subscribeDataChange(childPath, callbackType);
}
}
break;
}
default: {
if (children != null) {
for (String child : children) {
String childPath = path + "/" + child;
subscribeDataChange(childPath, callbackType);
}
}
break;
}
}
} catch (ZkNoNodeException | HelixMetaDataAccessException e) {
//TODO: avoid calling getChildren for path that does not exist
if (_changeType == CUSTOMIZED_STATE_ROOT) {
logger.warn(
"CallbackHandler {}, Failed to subscribe child/data change on path: {}, listener: {}. Instance "
+ "does not support Customized State!", _uid, path, _listener);
} else {
logger.warn("CallbackHandler {}, Failed to subscribe child/data change. path: {}, listener: {}",
_uid, path, _listener, e);
}
}
}
}
long end = System.currentTimeMillis();
logger.info("CallbackHandler{}, Subscribing to path: {} took: {}", _uid, path, (end - start));
}