private void subscribeForChanges()

in helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java [558:638]


  private void subscribeForChanges(NotificationContext.Type callbackType, String path,
      boolean watchChild) {

    logger.info("CallbackHandler {} subscribing changes listener to path: {}, callback type: {}, "
            + "event types: {}, listener: {}, watchChild: {}",
        _uid, path, callbackType, _eventTypes, _listener, watchChild);

    long start = System.currentTimeMillis();
    if (_eventTypes.contains(EventType.NodeDataChanged)
        || _eventTypes.contains(EventType.NodeCreated)
        || _eventTypes.contains(EventType.NodeDeleted)) {
      logger.info("CallbackHandler {} subscribing data change listener to path: {}", _uid, path);
      subscribeDataChange(path, callbackType);
    }

    if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
      List<String> children = subscribeChildChange(path, callbackType);
      if (watchChild) {
        try {
          switch (_changeType) {
            case CURRENT_STATE:
            case TASK_CURRENT_STATE:
            case CUSTOMIZED_STATE:
            case IDEAL_STATE:
            case EXTERNAL_VIEW:
            case CUSTOMIZED_VIEW:
            case TARGET_EXTERNAL_VIEW: {
              // check if bucketized
              BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
              List<ZNRecord> records = baseAccessor.getChildren(path, null, 0, 0, 0);
              for (ZNRecord record : records) {
                HelixProperty property = new HelixProperty(record);
                String childPath = path + "/" + record.getId();

                int bucketSize = property.getBucketSize();
                if (bucketSize > 0) {
                  // subscribe both data-change and child-change on bucketized parent node
                  // data-change gives a delete-callback which is used to remove watch
                  List<String> bucketizedChildNames = subscribeChildChange(childPath, callbackType);
                  subscribeDataChange(childPath, callbackType);

                  // subscribe data-change on bucketized child
                  if (bucketizedChildNames != null) {
                    for (String bucketizedChildName : bucketizedChildNames) {
                      String bucketizedChildPath = childPath + "/" + bucketizedChildName;
                      subscribeDataChange(bucketizedChildPath, callbackType);
                    }
                  }
                } else {
                  subscribeDataChange(childPath, callbackType);
                }
              }
              break;
            }
            default: {
              if (children != null) {
                for (String child : children) {
                  String childPath = path + "/" + child;
                  subscribeDataChange(childPath, callbackType);
                }
              }
              break;
            }
          }
        } catch (ZkNoNodeException | HelixMetaDataAccessException e) {
          //TODO: avoid calling getChildren for path that does not exist
          if (_changeType == CUSTOMIZED_STATE_ROOT) {
            logger.warn(
                "CallbackHandler {}, Failed to subscribe child/data change on path: {}, listener: {}. Instance "
                    + "does not support Customized State!", _uid, path, _listener);
          } else {
            logger.warn("CallbackHandler {}, Failed to subscribe child/data change. path: {}, listener: {}",
                _uid, path, _listener, e);
          }
        }
      }
    }

    long end = System.currentTimeMillis();
    logger.info("CallbackHandler{}, Subscribing to path: {} took: {}", _uid, path, (end - start));
  }