in helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java [539:637]
private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, HelixZkClient zkClient,
ZNRecord diff) {
String path = arg._znodePath;
ZnodePropertyType type = arg._propertyType;
String key = arg._key;
boolean success = true;
// retry 3 times in case there are write conflicts
long backoffTime = 20; // ms
for (int i = 0; i < 3; i++) {
try {
Stat stat = new Stat();
ZNRecord record = zkClient.<ZNRecord> readDataAndStat(path, stat, true);
if (isValueExpected(record, type, key, expect, diff)) {
if (arg._operation.compareTo("+") == 0) {
if (record == null) {
record = new ZNRecord("default");
}
ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
switch (valueType) {
case SINGLE_VALUE:
setSingleValue(record, arg._propertyType, arg._key, arg._updateValue._singleValue);
break;
case LIST_VALUE:
setListValue(record, arg._key, arg._updateValue._listValue);
break;
case MAP_VALUE:
setMapValue(record, arg._key, arg._updateValue._mapValue);
break;
case ZNODE_VALUE:
// deep copy
record =
ZNRECORD_SERIALIZER.deserialize(ZNRECORD_SERIALIZER
.serialize(arg._updateValue._znodeValue));
break;
case INVALID:
break;
default:
break;
}
} else if (arg._operation.compareTo("-") == 0) {
ZnodeModValueType valueType = getValueType(arg._propertyType, arg._key);
switch (valueType) {
case SINGLE_VALUE:
removeSingleValue(record, arg._propertyType, arg._key);
break;
case LIST_VALUE:
removeListValue(record, arg._key);
break;
case MAP_VALUE:
removeMapValue(record, arg._key);
break;
case ZNODE_VALUE:
record = null;
break;
case INVALID:
break;
default:
break;
}
} else {
logger.warn("fail to execute (unsupport operation): " + arg._operation);
success = false;
}
if (success == true) {
if (record == null) {
zkClient.delete(path);
} else {
try {
zkClient.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// OK
}
zkClient.writeData(path, record, stat.getVersion());
}
return true;
} else {
return false;
}
}
} catch (ZkBadVersionException e) {
// e.printStackTrace();
} catch (PropertyStoreException e) {
// e.printStackTrace();
}
try {
Thread.sleep(backoffTime);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
backoffTime *= 2;
}
return false;
}