List update()

in helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java [1132:1279]


  List<T> update(List<String> paths, List<DataUpdater<T>> updaters, List<List<String>> pathsCreated,
      List<Stat> stats, int options) {
    if (paths == null || paths.size() == 0) {
      LOG.error("paths is null or empty");
      return Collections.emptyList();
    }

    if (updaters.size() != paths.size() || (pathsCreated != null && pathsCreated.size() != paths
        .size())) {
      throw new IllegalArgumentException(
          "paths, updaters, and pathsCreated should be of same size");
    }

    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat>nCopies(paths.size(), null));
    List<T> updateData = new ArrayList<T>(Collections.<T>nCopies(paths.size(), null));

    CreateMode mode = AccessOption.getMode(options);
    if (mode == null) {
      LOG.error("Invalid update mode. options: " + options);
      return updateData;
    }

    ZkAsyncCallbacks.SetDataCallbackHandler[] cbList =
        new ZkAsyncCallbacks.SetDataCallbackHandler[paths.size()];
    ZkAsyncCallbacks.CreateCallbackHandler[] createCbList = null;
    boolean[] needUpdate = new boolean[paths.size()];
    Arrays.fill(needUpdate, true);

    long startT = System.nanoTime();

    try {
      boolean retry;
      do {
        retry = false;
        boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
        boolean failOnNoNode = false;

        // asycn read all data
        List<Stat> curStats = new ArrayList<Stat>();
        List<T> curDataList =
            get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length), false);

        // async update
        List<T> newDataList = new ArrayList<T>();
        for (int i = 0; i < paths.size(); i++) {
          if (!needUpdate[i]) {
            newDataList.add(null);
            continue;
          }
          String path = paths.get(i);
          DataUpdater<T> updater = updaters.get(i);
          T newData = updater.update(curDataList.get(i));
          newDataList.add(newData);
          if (newData == null) {
            // No need to create or update if the updater does not return a new version
            continue;
          }
          Stat curStat = curStats.get(i);
          if (curStat == null) {
            // node not exists
            failOnNoNode = true;
            needCreate[i] = true;
          } else {
            cbList[i] = new ZkAsyncCallbacks.SetDataCallbackHandler();
            _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
          }
        }

        // wait for completion
        boolean failOnBadVersion = false;

        for (int i = 0; i < paths.size(); i++) {
          ZkAsyncCallbacks.SetDataCallbackHandler cb = cbList[i];
          if (cb == null) {
            continue;
          }

          cb.waitForSuccess();

          switch (Code.get(cb.getRc())) {
            case OK:
              updateData.set(i, newDataList.get(i));
              setStats.set(i, cb.getStat());
              needUpdate[i] = false;
              break;
            case NONODE:
              failOnNoNode = true;
              needCreate[i] = true;
              break;
            case BADVERSION:
              failOnBadVersion = true;
              break;
            default:
              // if fail on error other than NoNode or BadVersion
              // will not retry
              needUpdate[i] = false;
              break;
          }
        }

        // if failOnNoNode, try create
        if (failOnNoNode) {
          createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
          for (int i = 0; i < paths.size(); i++) {
            ZkAsyncCallbacks.CreateCallbackHandler createCb = createCbList[i];
            if (createCb == null) {
              continue;
            }

            switch (Code.get(createCb.getRc())) {
              case OK:
                needUpdate[i] = false;
                updateData.set(i, newDataList.get(i));
                setStats.set(i, ZNode.ZERO_STAT);
                break;
              case NODEEXISTS:
                retry = true;
                break;
              default:
                // if fail on error other than NodeExists
                // will not retry
                needUpdate[i] = false;
                break;
            }
          }
        }

        // if failOnBadVersion, retry
        if (failOnBadVersion) {
          retry = true;
        }
      } while (retry);

      if (stats != null) {
        stats.clear();
        stats.addAll(setStats);
      }

      return updateData;
    } finally {
      long endT = System.nanoTime();
      if (LOG.isTraceEnabled()) {
        LOG.trace(
            "setData_async, size: " + paths.size() + ", paths: " + paths.get(0) + ",... time: " + (
                endT - startT) + " ns");
      }
    }
  }